## Библиотеки

In [None]:
import io
import os
import datetime as dt
import json
from time import sleep

import pandas as pd
import clickhouse_driver
import vertica_python
from confluent_kafka import Consumer, Producer

from string import Template

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
)

import boto3

from contextlib import contextmanager
from typing import Generator

import psycopg2
from psycopg2 import connect

from IPython.display import display, clear_output

In [None]:
from dotenv import load_dotenv
load_dotenv(dotenv_path=os.path.expandvars('/mnt/c/Users/igor_/.env'))

import logging
logger = logging.getLogger('kafka')
logger.setLevel(logging.WARN)

## Бакет в S3

In [None]:
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
BUCKET_NAME = 'final-project'
ENDPOINT_URL = 'https://storage.yandexcloud.net'

session = boto3.session.Session()
s3_resourse = session.resource(
    service_name='s3',
    endpoint_url=ENDPOINT_URL,
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

bucket = s3_resourse.Bucket(BUCKET_NAME)

In [None]:
for file in s3_resourse.meta.client.list_objects_v2(Bucket='final-project')['Contents']:
    print(file['Key'], file['LastModified'])

In [None]:
for obj in bucket.objects.all():
    buffer = io.BytesIO()
    bucket.download_fileobj(obj.key, buffer)
    buffer.seek(0)
    
    df = pd.read_csv(buffer)
    print(df.shape)
    display(df.head())

In [None]:
buffer = io.BytesIO()
bucket.download_fileobj('currencies_history.csv', buffer)
buffer.seek(0)

In [None]:
df = pd.read_csv(buffer)

In [None]:
columns = buffer.readline().decode("utf-8").rstrip()
buffer.seek(0)

In [None]:
buffer.getvalue()
pd.read_csv(buffer).to_dict('split')['data']

## Spark

In [None]:
os.environ['JAVA_HOME']='/usr/lib/jvm/java-8-openjdk-amd64'

# import findspark
# findspark.add_packages('com.clickhouse:clickhouse-jdbc:0.4.6')
# findspark.add_jars('/home/ishirokov/jars/clickhouse-jdbc-0.4.6-all.jar')

In [None]:
class SparkConnector():
    def __init__(self, spark_jars_packages,
                 app_name=f"app_{dt.datetime.now().strftime('%Y%m%d')}") -> None:
        self.spark = (
            SparkSession
                .builder
                .config("spark.sql.session.timeZone", "UTC")
                .config("spark.jars.packages", spark_jars_packages)
                .appName(app_name)
                .getOrCreate()
        )
        self.spark.sparkContext.setLogLevel('ERROR')
    
    def load_df(self, starting_offset: int, ending_offset: int, kafka_options: dict) -> DataFrame:
        offsets = Template("""{"transaction-service-input":{"0":$offset,"1":$offset}}""")
        
        return (
            self.spark.read
            .format("kafka")
            .options(**kafka_options)
            .option("startingOffsets", offsets.substitute(offset=starting_offset))
            .option("endingOffsets", offsets.substitute(offset=ending_offset))
            .option("failOnDataLoss", "false")
            .load()
        )
    
    def read_stream(self, kafka_options: dict):      
        return (
            self.spark.readStream
            .format("kafka")
            .options(**kafka_options)
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", "false")
            .load()
        )
    
    def transform_df(self, df: DataFrame, field: str, schema,
                     **kwargs) -> DataFrame:
        df = df.withColumn('value',  F.col(field).cast('string')) \
            .withColumn('data', F.from_json(F.col('value'), schema=schema)) \
            .selectExpr('data.*')
        if kwargs:
            return df.dropDuplicates([kwargs['id']]) \
                .withWatermark(kwargs['watermark'], kwargs['interval'])
        else:
            return df
        
    def read_pg_table(self, table: str, spark: SparkSession,
                      pg_settings) -> DataFrame:
        df = (
            self.spark.read
                .format("jdbc")
                .option("driver", "org.postgresql.Driver")
                .option("dbtable", table)
                .options(**pg_settings)
                .load()
        )

        return df

In [None]:
spark_jars_packages = ",".join(
    [
        "org.postgresql:postgresql:42.4.0",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
    ]
)

kafka_pass = Template('org.apache.kafka.common.security.scram.ScramLoginModule required username=\"producer_consumer\" password=\"$kafka_pass\";')

kafka_options = {
    'subscribe': 'transaction-service-input',
    'kafka.bootstrap.servers': 'rc1a-083hhcof7uqe71hd.mdb.yandexcloud.net:9091',
    'kafka.security.protocol': 'SASL_SSL',
    'kafka.sasl.mechanism': 'SCRAM-SHA-512',
    'kafka.sasl.jaas.config': kafka_pass.substitute(kafka_pass=os.environ['KAFKA_PASSWORD']),
    'kafka.ssl.truststore.location': '/mnt/c/ca/YandexInternalRootCA.jks',
    'kafka.ssl.truststore.password': os.environ['TRUSTSTORE_PASSWORD']
}

spark = SparkConnector(spark_jars_packages)

In [None]:
df = spark.read_stream(kafka_options)

q = (
    df.writeStream
    .outputMode("append")
    .format("memory")
    .queryName("query_0")
    .start()
)

while True:
    clear_output(wait=True)
    display(q.status)
    display(spark.spark.sql('SELECT * FROM query_0').show())
    sleep(1)

In [None]:
msg_schema = StructType([
    StructField("object_id", StringType(), nullable=True),
    StructField("object_type", StringType(), nullable=True),
    StructField("sent_dttm", TimestampType(), nullable=True),
    StructField("payload", StringType(), nullable=True),
])
transaction_schema = StructType([
    StructField("operation_id", StringType(), nullable=True),
    StructField("account_number_from", IntegerType(), nullable=True),
    StructField("account_number_to", IntegerType(), nullable=True),
    StructField("currency_code", IntegerType(), nullable=True),
    StructField("country", StringType(), nullable=True),
    StructField("status", StringType(), nullable=True),
    StructField("transaction_type", StringType(), nullable=True),
    StructField("amount", IntegerType(), nullable=True),
    StructField("transaction_dt", TimestampType(), nullable=True),
])

df = spark.load_df(0, -1, kafka_options)

kwargs = {'id': 'object_id', 'watermark': 'sent_dttm', 'interval': '60 minutes'}
tr_df = spark.transform_df(df, 'value', msg_schema, **kwargs)

tr_df = tr_df.filter(F.col('object_type') == F.lit('TRANSACTION'))
transactions = spark.transform_df(tr_df, 'payload', transaction_schema)

## PG to Kafka

In [None]:
class PgConnect:
    def __init__(self, host: str, port: int, db_name: str, user: str, pw: str, sslmode: str = "require") -> None:
        self.host = host
        self.port = port
        self.db_name = db_name
        self.user = user
        self.pw = pw
        self.sslmode = sslmode

    def url(self) -> str:
        return """
            host={host}
            port={port}
            dbname={db_name}
            user={user}
            password={pw}
            target_session_attrs=read-write
            sslmode={sslmode}
        """.format(
            host=self.host,
            port=self.port,
            db_name=self.db_name,
            user=self.user,
            pw=self.pw,
            sslmode=self.sslmode)

    @contextmanager
    def connection(self) -> Generator[connect, None, None]:
        conn = psycopg2.connect(self.url())
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()
            
            
db = PgConnect(
    host="rc1b-w5d285tmxa8jimyn.mdb.yandexcloud.net", port=6432,
    db_name="db1", user=os.environ['PG_USER'], pw=os.environ['PG_PASSWORD'], sslmode="require"
)

with db.connection() as conn:
    with conn.cursor() as cur:
        cur.execute(
            """
            SELECT operation_id, account_number_from, account_number_to, currency_code, country, status, transaction_type, amount, transaction_dt
            FROM public.transactions;

            """,
        )
        conn.commit()
        res = cur.fetchall()

In [None]:
def error_callback(err):
    print('Error message: {}'.format(err))


class KafkaProducer:
    def __init__(self, host: str, port: int, user: str, password: str, topic: str, cert_path: str) -> None:
        params = {
            'bootstrap.servers': f'{host}:{port}',
            'security.protocol': 'SASL_SSL',
            'ssl.ca.location': cert_path,
            'sasl.mechanism': 'SCRAM-SHA-512',
            'sasl.username': user,
            'sasl.password': password,
            'error_cb': error_callback,
        }

        self.topic = topic
        self.p = Producer(params)

    def produce(self, payload) -> None:
        self.p.produce(self.topic, json.dumps(payload))
        self.p.flush(10)

class KafkaProducer:
    def __init__(self, host: str, port: int, user: str, password: str, topic: str, cert_path: str) -> None:
        params = {
            'bootstrap.servers': f'{host}:{port}',
            'security.protocol': 'SASL_SSL',
            'ssl.ca.location': cert_path,
            'sasl.mechanism': 'SCRAM-SHA-512',
            'sasl.username': user,
            'sasl.password': password,
            'error_cb': error_callback,
        }

        self.topic = topic
        self.p = Producer(params)

    def produce(self, payload) -> None:
        self.p.produce(self.topic, json.dumps(payload))
        self.p.flush(10)

In [None]:
k_prod = KafkaProducer(
    'rc1a-083hhcof7uqe71hd.mdb.yandexcloud.net',
    9091,
    'producer_consumer',
    os.environ['KAFKA_PASSWORD'],
    'transaction-service-input',
    '/mnt/c/ca/YandexInternalRootCA.crt'
)

In [None]:
columns = [
    'operation_id',
    'account_number_from',
    'account_number_to',
    'currency_code',
    'country',
    'status',
    'transaction_type',
    'amount',
    'transaction_dt'
]

for tp in df.itertuples():
    k_prod.produce(
        dict(
            operation_id=tp.operation_id,
            account_number_from=tp.account_number_from,
            account_number_to=tp.account_number_to,
            currency_code=tp.currency_code,
            country=tp.country,
            status=tp.status,
            transaction_type=tp.transaction_type,
            amount=tp.amount,
            transaction_dt=tp.transaction_dt,
        )
    )

## Vertica

In [None]:
conn_info = {
    'host': '51.250.75.20',
    'port': 5433,
    'user': os.environ['VERTICA_USER'],
    'password': os.environ['VERTICA_PASSWORD'],
    'database': 'dwh',
    'autocommit': True
}

In [None]:
with vertica_python.connect(**conn_info) as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT DISTINCT table_name FROM all_tables WHERE table_name LIKE 'tran%")
        print(cur.fetchall())

## Clickhouse

In [None]:
conn_info = dict(
    host='rc1a-pcouajriqbdi97o4.mdb.yandexcloud.net',
    port=9440,
    database='dwh',
    user=os.environ['CH_USER'],
    password=os.environ['CH_PASSWORD'],
    secure=True,
    ca_certs='/mnt/c/ca/YandexInternalRootCA.crt'
)

In [None]:
with clickhouse_driver.connect(**conn_info) as conn:
    with conn.cursor() as cur:
        cur.execute('SHOW TABLES')
        print(cur.fetchall())