## DB connection

In [25]:
# Импортируем библиотеку, соответствующую типу нашей базы данных 
import psycopg2

# Создаем соединение с нашей базой данных

conn = psycopg2.connect(host=hostname, user=username, password=password, dbname=database)

# Использование with в psycopg2

# with psycopg2.connect(host=hostname, user=username, password=password, dbname=database) as conn:
#     with conn.cursor() as cur:

# Создаем курсор - это специальный объект который делает запросы и получает их результаты
cursor = conn.cursor()

# Делаем SELECT запрос к базе данных, используя обычный SQL-синтаксис
cursor.execute("SELECT Name FROM Artist ORDER BY Name LIMIT 3")

# Получаем результат сделанного запроса
results = cursor.fetchall()
results2 =  cursor.fetchall()

print(results)   # [('A Cor Do Som',), ('Aaron Copland & London Symphony Orchestra',), ('Aaron Goldberg',)]
print(results2)  # []

# Делаем INSERT запрос к базе данных, используя обычный SQL-синтаксис
cursor.execute("insert into Artist values (Null, 'A Aagrh!') ")

# Если мы не просто читаем, но и вносим изменения в базу данных - необходимо сохранить транзакцию
conn.commit()

# Проверяем результат
cursor.execute("SELECT Name FROM Artist ORDER BY Name LIMIT 3")
results = cursor.fetchall()
print(results)  # [('A Aagrh!',), ('A Cor Do Som',), ('Aaron Copland & London Symphony Orchestra',)]

# Разбиваем запрос на несколько строк в тройных кавычках

cursor.execute("""
  SELECT name
  FROM Artist
  ORDER BY Name LIMIT 3
""")

# Объединяем запросы к базе данных в один вызов метода

cursor.executescript("""
 insert into Artist values (Null, 'A Aagrh!');
 insert into Artist values (Null, 'A Aagrh-2!');
""")

# C подставновкой по порядку на места знаков вопросов:
cursor.execute("SELECT Name FROM Artist ORDER BY Name LIMIT ?", ('2'))

# И с использованием именнованных замен:
cursor.execute("SELECT Name from Artist ORDER BY Name LIMIT :limit", {"limit": 3})

# Делаем множественную вставку строк проходя по коллекции с помощью метода курсора .executemany()

# Обратите внимание, даже передавая одно значение - его нужно передавать кортежем!
# Именно по этому тут используется запятая в скобках!
new_artists = [
    ('A Aagrh!',),
    ('A Aagrh!-2',),
    ('A Aagrh!-3',),
]
cursor.executemany("insert into Artist values (Null, ?);", new_artists)

# Получаем результаты по одному, используя метод курсора .fetchone()

# Он всегда возвращает кортеж или None. если запрос пустой.

cursor.execute("SELECT Name FROM Artist ORDER BY Name LIMIT 3")
print(cursor.fetchone())    # ('A Cor Do Som',)
print(cursor.fetchone())    # ('Aaron Copland & London Symphony Orchestra',)
print(cursor.fetchone())    # ('Aaron Goldberg',)
print(cursor.fetchone())    # None

# Использование курсора как итератора
for row in cursor.execute('SELECT Name from Artist ORDER BY Name LIMIT 3'):
        print(row)

#Повышаем устойчивость кода        
        
try:
    cursor.execute(sql_statement)
    result = cursor.fetchall()
except sqlite3.DatabaseError as err:       
    print("Error: ", err)
else:
    conn.commit()
    
# Ипользование row_factory    
    
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print(cur.fetchone()["a"])    

# Не забываем закрыть соединение с базой данных
conn.close()

NameError: name 'hostname' is not defined

## Создание коннекта

In [None]:
def create_connection(db_name, db_user, db_password, db_host, db_port):
    connection = None
    try:
        connection = psycopg2.connect(
            database=db_name,
            user=db_user,
            password=db_password,
            host=db_host,
            port=db_port,
        )
        print("Connection to PostgreSQL DB successful")
    except OperationalError as e:
        print(f"The error '{e}' occurred")
    return connection

In [None]:
connection = create_connection("postgres", "postgres", "abc123", "127.0.0.1", "5432")

## Выполнение запроса

In [None]:
def execute_query(connection, query):
    connection.autocommit = True
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        print("Query executed successfully")
    except OperationalError as e:
        print(f"The error '{e}' occurred")

In [None]:
create_users_table = """
CREATE TABLE IF NOT EXISTS users (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL, 
  age INTEGER,
  gender TEXT,
  nationality TEXT
)
"""

execute_query(connection, create_users_table)

## Создание таблицы

In [None]:
def create_table():
    """
    Creates a "users" table in the python_app database. 
    """
    connection = False

    try:
        ##### Establishes a connection with the python_app database and creates a cursor object that will be used to execute SQL commands #####
        connection = psycopg2.connect(host = "127.0.0.1", database = "python_app", port = "5432", user = "postgres", password = "randompassword")
        cursor = connection.cursor()

        ##### Create a database table ###
        create_table_query = '''CREATE TABLE users
                (id BIGSERIAL NOT NULL PRIMARY KEY,
                username VARCHAR(50) NOT NULL,
                email TEXT NOT NULL UNIQUE,
                password_hash TEXT NOT NULL); '''
        
        cursor.execute(create_table_query)
        connection.commit()
        print("Users table added to python_app database.")

    except (Exception, Error) as error:
        print("An error occured while trying to connect to the python_app database")

    finally:
        if connection:
            cursor.close()
            connection.close()
            print("Connection to python_app database has now been closed")

##### Main Code #####
create_table()

## Вставка записей

In [None]:
posts = [
    ("Happy", "I am feeling very happy today", 1),
    ("Hot Weather", "The weather is very hot today", 2),
    ("Help", "I need some help with my work", 2),
    ("Great News", "I am getting married", 1),
    ("Interesting Game", "It was a fantastic game of tennis", 5),
    ("Party", "Anyone up for a late-night party today?", 3),
]

post_records = ", ".join(["%s"] * len(posts))

insert_query = (
    f"INSERT INTO posts (title, description, user_id) VALUES {post_records}"
)

connection.autocommit = True
cursor = connection.cursor()
cursor.execute(insert_query, posts)

## Выбор данных

In [None]:
def execute_read_query(connection, query):
    cursor = connection.cursor()
    result = None
    try:
        cursor.execute(query)
        result = cursor.fetchall()
        return result
    except OperationalError as e:
        print(f"The error '{e}' occurred")

select_users = "SELECT * FROM users"
users = execute_read_query(connection, select_users)

for user in users:
    print(user)

## Обновление данных

In [None]:
update_post_description = """
UPDATE
  posts
SET
  description = "The weather has become pleasant now"
WHERE
  id = 2
"""

execute_query(connection, update_post_description)

## Удаление данных

In [None]:
delete_comment = "DELETE FROM comments WHERE id = 5"
execute_query(connection, delete_comment)

## Работа с датами

In [None]:
import psycopg2
import datetime
from psycopg2 import Error

try:
    # Подключиться к существующей базе данных
    connection = psycopg2.connect(user="postgres",
                                  # пароль, который указали при установке PostgreSQL
                                  password="1111",
                                  host="127.0.0.1",
                                  port="5432",
                                  database="postgres_db")

    cursor = connection.cursor()
    # Выполнение SQL-запроса для вставки даты и времени в таблицу
    insert_query = """ INSERT INTO item (item_Id, item_name, purchase_time, price)
                              VALUES (%s, %s, %s, %s)"""
    item_purchase_time = datetime.datetime.now()
    item_tuple = (12, "Keyboard", item_purchase_time, 150)
    cursor.execute(insert_query, item_tuple)
    connection.commit()
    print("1 элемент успешно добавлен")

    # Считать значение времени покупки PostgreSQL в Python datetime
    cursor.execute("SELECT purchase_time from item where item_id = 12")
    purchase_datetime = cursor.fetchone()
    print("Дата покупки товара", purchase_datetime[0].date())
    print("Время покупки товара", purchase_datetime[0].time())

except (Exception, Error) as error:
    print("Ошибка при работе с PostgreSQL", error)
finally:
    if connection:
        cursor.close()
        connection.close()
        print("Соединение с PostgreSQL закрыто")

## Функции PostgreSQL

In [None]:
CREATE FUNCTION reffunc(refcursor) RETURNS refcursor AS $$
BEGIN
    OPEN $1 FOR SELECT col FROM test;
    RETURN $1;
END;
$$ LANGUAGE plpgsql;
You can read the cursor content by calling the function with a regular, non-named, Psycopg cursor:

cur1 = conn.cursor()
cur1.callproc('reffunc', ['curname'])
and then use a named cursor in the same transaction to “steal the cursor”:

cur2 = conn.cursor('curname')
for record in cur2:     # or cur2.fetchone, fetchmany...
    # do something with record
    pass

## Соответствие типов данных Python и PostgreSQL

## SQL Formatter

In [26]:
example_sql = """
create or replace table mytable as -- mytable example
seLecT a.asdf, b.qwer, -- some comment here
c.asdf, -- some comment there
b.asdf2 frOm table1 as a leFt join 
table2 as b -- and here a comment
    on a.asdf = b.asdf  -- join this way
    inner join table3 as c
on a.asdf=c.asdf
whEre a.asdf= 1 -- comment this
anD b.qwer =2 and a.asdf<=1 --comment that
or b.qwer>=5
groUp by a.asdf
"""

In [30]:
from sql_formatter.core import format_sql
print(format_sql(example_sql))

CREATE OR REPLACE TABLE mytable AS -- mytable example
SELECT a.asdf,
       b.qwer, -- some comment here
       c.asdf, -- some comment there
       b.asdf2
FROM   table1 as a
    LEFT JOIN table2 as b -- and here a comment
        ON a.asdf = b.asdf -- join this way
    INNER JOIN table3 as c
        ON a.asdf = c.asdf
WHERE  a.asdf = 1 -- comment this
   and b.qwer = 2
   and a.asdf <= 1 --comment that
    or b.qwer >= 5
GROUP BY a.asdf


## Logging

In [1]:
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.debug('Start')
logging.warning('test')

In [None]:
import os
import random 
import logging

current_filename = os.path.basename(__file__).rsplit('.', 1)[0]

# changing level we can change frome what level we want to log the events

logging.basicConfig(filename = current_filename + '.log', level = logging.INFO, format = '%(asctime)s:%(levelname)s:%(message)s')


if __name__ == '__main__':
    logging.info('Generate some random integers')

    num_numbers = 10
    min_value = 0
    max_value = 20
    warning_threshold = 5
    error_threshold = 10
    critical_threshold = 15

    logging.debug('Numbers: {}\nMin value: {}\nMax value: {}\nWaring threshold: {}\nError threshold: {}\nCritical threshold: {}'.format(num_numbers, min_value, max_value, warning_threshold, error_threshold, critical_threshold))

    logging.debug('Start')
    for i in range(num_numbers):
        logging.debug('Iteration: {}'.format(i))
        value = random.randint(min_value, max_value)

        try:
            logging.debug('\tTry value')
            if value > critical_threshold:
                raise Exception('\tValue: {} -> Critical!'.format(value))
            elif value > error_threshold:
                logging.error('\tValue: {} -> Error!'.format(value))
            elif value > warning_threshold:
                logging.warning('\tValue: {} -> Warning!'.format(value))
            else:
                logging.info('\tValue: {}'.format(value))
        except Exception as e:
            logging.critical(e)
            exit()
        finally:
            logging.debug('The try except is finished')

    logging.debug('End')
else:
    pass

## psycopg2 connection v1

In [None]:
import os
import psycopg2
from sqlalchemy import create_engine
import sys
import traceback


class PostgreSQL_conn():
    def __init__(self, params_dict=None):
        if params_dict is None:
            self.params_dict = {
                'user': os.environ['USERNAME'],
                'password': os.environ['POSTGRESQL_PASSWORD'],
                'host': '***',
                'port': '**',
                'database': '**'
            }
        else:
            self.params_dict = params_dict
    
    def show_exception(self, err):
        err_type, err_obj, traceback = sys.exc_info()    
        line_n = traceback.tb_lineno    
        print("\npsycopg2 ERROR:", err, "on line number:", line_n)
        print("psycopg2 traceback:", traceback, "-- type:", err_type) 
        print(err)   
        
    def connect(self):
        try:
            conn = psycopg2.connect(**self.params_dict)
            cursor = conn.cursor()
        except Exception as err:
            self.show_exception(err)        
            conn = None
            cursor = None
        self.conn, self.cursor = conn, cursor
        return conn, cursor
    
    def to_alchemy(self, df, table_name, mode='append', db='postgresql', schema='analytics'):
        try:
            url = '{}+psycopg2://{}:{}@{}:{}/{}'.format(
                db,
                self.params_dict['user'],
                self.params_dict['password'],
                self.params_dict['host'],
                self.params_dict['port'],
                self.params_dict['database']
            )
            engine = create_engine(url)
            df.to_sql(
                table_name,
                schema = schema,
                con = engine, 
                index = False, 
                if_exists = mode
            )
        except Exception as err:
            self.show_exception(err)
    
    def create_table(self, table_name, attrs_sql):
        self.cursor.execute(f'CREATE TABLE {table_name} ({attrs_sql}) IF NOT EXISTS')
        

## psycopg2 connection v2

In [None]:
import psycopg2
from psycopg2 import Error
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
try:
    # Подключение к существующей базе данных
    connection = psycopg2.connect(user="postgres",
                                  # пароль, который указали при установке PostgreSQL
                                  password="1111",
                                  host="127.0.0.1",
                                  port="5432")
    connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    # Курсор для выполнения операций с базой данных
    cursor = connection.cursor()
    sql_create_database = 'create database postgres_db'
    cursor.execute(sql_create_database)
except (Exception, Error) as error:
    print("Ошибка при работе с PostgreSQL", error)
finally:
    if connection:
        cursor.close()
        connection.close()
        print("Соединение с PostgreSQL закрыто")

## Spark Class

In [None]:
import os
import sys
import time
from datetime import datetime


class SparkHelper:
    """Класс содержит функции выгрузки и загрузки данных через спарк"""

    def create_sql_context(self, config=None, name='None', port='4440', instances=5, n_cores=2, spark_version='2.4',  name_nodes=['CLSKLCIB', 'CLSKLSBX', 'SUPERCLUSTER'], executor_memory='10g', driver_memory='10g'):
        """
        Создание спарк контекста

        config:
            Конфиг контекста
        name:
            имя процесса
        port:
            spark.ui.port
        instances:
            кол-во instance
        n_cores:
            кол-во cores
        spark_version:
            '2.1', '2.2', '3'

        """
        if spark_version=='3':
            print('spark_version :' +spark_version)
            os.environ['SPARK_MAJOR_VERSION'] = '3'
            os.environ['SPARK_HOME'] = '/usr/sdp/3.4.0.1-1/spark3/'
            os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
            os.environ['PYSPARK_PYTHON'] = '/data/parcels/PYENV.ZNO20008661.Spark3.0.1-3.5.pyenv.p0.1/bin/python'
            os.environ['LD_LIBRARY_PATH'] = '/opt/python/virtualenv/jupyter/lib'

            sys.path.insert(0, '/usr/sdp/3.4.0.1-1/spark3/python/')
            sys.path.insert(0, '/usr/sdp/3.4.0.1-1/spark3/python/lib/py4j-0.10.7-src.zip')
        else:
            print('spark_version :' +spark_version)
            os.environ['SPARK_MAJOR_VERSION'] = '2'
            os.environ['SPARK_HOME'] = '/usr/sdp/3.4.0.1-1/spark2/'
            os.environ['PYSPARK_PYTHON'] = '/data/parcels/PYENV.ZNO20008661-3.5.pyenv.p0.20/bin/python'
            os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
            os.environ['LD_LIBRARY_PATH'] = '/opt/python/virtualenv/jupyter/lib'
            
            sys.path.insert(0, '/usr/sdp/3.4.0.1-1/spark2/python/')
            sys.path.insert(0, '/usr/sdp/3.4.0.1-1/spark2/python/lib/py4j-0.10.7-src.zip')

        from pyspark import SparkContext, SparkConf, HiveContext

        if config is None:
            config = SparkConf().setMaster("yarn").setAppName(name)

            config.setAll(
                [  
                   ('spark.local.dir', 'sparktmp'),
                   ('hive.exec.dynamic.partition.mode', 'nonstrict'),
                   ('spark.executor.memory',executor_memory),
                   ('spark.driver.memory',driver_memory),
                   ('spark.driver.maxResultSize','15g'),
                   ('spark.executor.instances', instances),
                   ('spark.default.parallelism', '1000'),
                   ('spark.port.maxRetries', '500'),
                   ('spark.executor.cores', n_cores),
                   ('spark.dynamicAllocation.enabled', 'false'),
                   ('spark.ui.port', port),
                   ('spark.blacklist.enabled', 'false'),
                   ('spark.sql.shuffle.partitions', '400'),
                   ('spark.blacklist.task.maxTaskAttemptsPerNode', '15'),
                   ('spark.blacklist.task.maxTaskAttemptsPerExecutor', '15'),
                   ('spark.task.maxFailures', '50'),
                   ('spark.yarn.access.namenodes', 'hdfs://clsklcib:8020,hdfs://clsklsbx:8020,hdfs://hdfsgw:8020,hdfs://nsld3:8020,hdfs://nsld3:8020,hdfs://clsklrozn:8020,hdfs://clsklarnsdpsbx:8020',)
                ])

        print('Start')
        self.sc = SparkContext.getOrCreate(conf=config)
        self.sqlContext = HiveContext(self.sc)
        print('Context ready: %s' % self.sc)

        return self.sc, self.sqlContext


    def save_to_csv(self, df, sep:str, username:str,  hdfs_path:str, local_path:str=None, isHeader='true'):
        """
        Сохраняет Spark DataFrame с csv и создает линк на этот файл в файловой системе Jupyter

        Parameters
        ----------
        username:
            Имя пользователя в ЛД
        hdfs_path:
            Путь для сохранения файла в HDFS относительно папки пользователя (например notebooks/data)
        local_path:
            Путь, по которому будет доступен файл в файловой системе Jupyter (/home)
            Если None - запись производится только в hdfs
        """
        import subprocess
        import os

        df.write \
            .format('com.databricks.spark.csv') \
            .mode('overwrite') \
            .option('sep', sep) \
            .option('header', isHeader) \
            .option("quote", '\u0000') \
            .save(hdfs_path)

        if local_path!=None:
            path_to_hdfs = os.path.join('/user', username, hdfs_path)
            path_to_local = os.path.join(local_path)
            proc = subprocess.Popen(['hdfs', 'dfs', '-getmerge', path_to_hdfs, path_to_local])
            proc.communicate()

            columns_row = sep.join(df.columns)
            os.system("sed -i -e 1i'" + columns_row + "\\' " + path_to_local)


    def read_from_csv(self, hdfs_path:str, schema=None):
        """
        Чтения из csv в Spark Dataframe
        Parameters
        ----------
        hdfs_path:
            Путь файла в HDFS относительно папки пользователя (например notebooks/data)
        schema:
            Схема таблицы
        """

        if schema is None:
            return sqlContext.read.format('com.databricks.spark.csv') \
            .option('inferSchema', 'true') \
            .option('header', 'true') \
            .option('delimiter', ';') \
            .option('decimal', '.') \
            .option('dateFormat', 'yyyy-MM-dd') \
            .option('encoding', 'cp1251') \
            .load(hdfs_path)
        else:
            return sqlContext.read.format('com.databricks.spark.csv') \
                .schema(schema) \
                .option('header', 'true') \
                .option('delimiter', ';') \
                .option('decimal', '.') \
                .option('dateFormat', 'yyyy-MM-dd') \
                .option('encoding', 'cp1251') \
                .load(hdfs_path)


    def create_table_from_select(self, db_in='t_ural_kb', table_name='', db_out='t_ural_kb', prefix='0_'):
        """
        Для копирования таблиц из схемы в схему

        Parameters
        ----------

        db_in:
             схема из которой нужно копировать
        table:
             копируемая таблица
        db_out:
             схема в которую копируем
        """

        start=time.time()
        print("Start copying in {db_in}.{table} to {db_out}.{table}".format(db_in=db_in, table=prefix+table_name, db_out=db_out))

        self.sqlContext.sql('DROP TABLE IF EXISTS {db_out}.{tab} purge'.format(db_out=db_out, tab=prefix+table_name))
        sql_query="""create table {db_out}.{prefix}{table}
                            as
                            select *
                            from {db_in}.{table}""".format(db_in=db_in, table=table_name, prefix=prefix, db_out=db_out)

        self.sqlContext.sql(sql_query)

        print("End copying in {db_in}.{table} to {db_out}.{table} ...  {t:0.2f} seconds ".format(db_in=db_in, table=prefix+table_name, db_out=db_out, t=(start-time.time())))

    def save_to_hive(self, df, db='t_ural_kb', table_name='temp_{d:}'.format(d=datetime.now().date()).replace('-', '_')):
        """
        Сохранение spark датафрейма в hive
        Parameters
        ----------
        df:
            spark датафрейм для записи в таблицу
        db:
            схема в которой создается таблица
        table_name:
            название таблицы, если не указано temp_{текущая дата} (temp_01_07_2019)
        """
        start=time.time()
        df.registerTempTable(table_name)
        self.sqlContext.sql('DROP TABLE IF EXISTS {db}.{tab} purge'.format(db=db, tab=table_name))
        self.sqlContext.sql('CREATE TABLE {db}.{tab} SELECT * FROM {tab}'.format(db=db, tab=table_name))
        print("End creating {db}.{table}...  {t:0.2f} seconds ".format(db=db, table=table_name, t=(time.time()-start)))


    def get_table (self, db, table_name, columns=[]):
        """
        Селект таблицы
        Parameters
        ----------
        db:
            схема из которой селектится таблица
        table_name:
            название таблицы
        columns:
            список колонок для селекта, если пустой *
        """
        return self.sqlContext.sql('SELECT {cols} FROM {db}.{tab}'.format(db=db, tab=table_name, cols=', '.join(columns) if columns!=[] else '*'))

    def add_prefix_col_name(self, df, prefix):
        """
        Добавление префикса ко всем названиям колонок в таблице
        Parameters
        ----------
        df:
            датафрейм в котором нужно переименовать столбцы
        prefix:
            префикс для столбцов

        """
        for col in df.columns:
            df=df.withColumnRenamed(col, prefix+'_'+col)
        return df

    def cast_columns(self, df, columns_types):
        """
        Приведение типов к списку колонок
        Parameters
        ----------
        df:
            спарк датафрейм
        columns_types:
            словарь соответствий колонка : тип к которому нужно привести колонку
        """
        from pyspark.sql import functions  as F
        for col_name, type_col in columns_types.items():
            df=df.withColumn(col_name, F.col(col_name).cast(type_col))

        return df

