In [1]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder \
    .appName('ClickHouse Reports') \
    .config('spark.jars', '/opt/spark/jars/postgresql-42.7.5.jar,/opt/spark/jars/clickhouse-jdbc-0.4.6-all.jar') \
    .getOrCreate()

In [3]:
pg_properties = {'user': 'postgres', 'password': 'password', 'driver': 'org.postgresql.Driver'}
pg_url = 'jdbc:postgresql://bigdata_pg:5432/postgres_db'
ch_properties = {'user': 'clickhouse', 'password': 'password', 'driver': 'com.clickhouse.jdbc.ClickHouseDriver'}
ch_url = 'jdbc:clickhouse://clickhouse:8123/clickhouse_db'

In [4]:
def write(df, table_name, create_options='ENGINE = Log()'):
    df.write.mode('overwrite').option(
        'createTableOptions', create_options
    ).jdbc(
        url=ch_url, 
        table=table_name, 
        properties=ch_properties
    )

In [5]:
customers = spark.read.jdbc(url=pg_url, table='customers', properties=pg_properties)
products = spark.read.jdbc(url=pg_url, table='products', properties=pg_properties)
sales = spark.read.jdbc(url=pg_url, table='sales', properties=pg_properties)
sellers = spark.read.jdbc(url=pg_url, table='sellers', properties=pg_properties)
stores = spark.read.jdbc(url=pg_url, table='stores', properties=pg_properties)
suppliers = spark.read.jdbc(url=pg_url, table='suppliers', properties=pg_properties)

In [6]:
raw = products.alias('products').join(
    sales.alias('sales'),
    col('products.id') == col('sales.product_id'),
    'inner'
).select(
    col('products.id').alias('id'),
    col('products.name').alias('name'),
    col('products.price').alias('price'),
    col('products.category').alias('category'),
    col('products.reviews').alias('reviews'),
    col('products.rating').alias('rating'),
    col('sales.quantity').alias('quantity')
)

top10_quantity_per_product = raw.groupBy('id', 'name').agg(
    F.sum(col('quantity')).alias('total_quantity')
).orderBy(F.desc('total_quantity')).limit(10)

total_revenue_per_category = raw.withColumn('revenue', col('price') * col('quantity')).groupBy('category').agg(
    F.sum(col('revenue')).alias('total_revenue')
).orderBy(F.desc('total_revenue'))

mean_rating_and_reviews_per_product = raw.groupBy('id', 'name').agg(
    F.mean(col('rating')).alias('mean_rating'),
    F.mean(col('reviews')).alias('mean_reviews')
).orderBy('id')

write(top10_quantity_per_product, 'top10_quantity_per_product')
write(total_revenue_per_category, 'total_revenue_per_category')
write(mean_rating_and_reviews_per_product, 'mean_rating_and_reviews_per_product')

In [7]:
raw = customers.alias('customers').join(
    sales.alias('sales'),
    col('customers.id') == col('sales.customer_id'),
    'inner'
).select(
    col('customers.id').alias('id'),
    col('customers.first_name').alias('first_name'),
    col('customers.last_name').alias('last_name'),
    col('customers.country').alias('country'),
    col('sales.total_price').alias('total_price'),
    col('sales.quantity').alias('quantity')
)

top10_expenses_per_customer = raw.withColumn('revenue', col('total_price') * col('quantity')).groupBy('id', 'first_name', 'last_name').agg(
    F.sum(col('revenue')).alias('total_expenses')
).orderBy(F.desc('total_expenses')).limit(10)

total_clients_per_country = raw.groupBy('country').agg(
    F.count(col('id')).alias('total_clients')
).orderBy('country')

mean_check_per_customer = raw.withColumn('revenue', col('total_price') * col('quantity')).groupBy('id', 'first_name', 'last_name').agg(
    F.mean(col('revenue')).alias('mean_check')
).orderBy('id')

write(top10_expenses_per_customer, 'top10_expenses_per_customer')
write(total_clients_per_country, 'total_clients_per_country')
write(mean_check_per_customer, 'mean_check_per_customer')

In [8]:
raw = sales.withColumn('revenue', col('total_price') * col('quantity')).withColumn('year', F.year(col('date'))).withColumn('month', F.month(col('date'))).select(
    col('revenue'),
    col('quantity'),
    col('year'),
    col('month')
)

yearly_trends = raw.groupBy('year').agg(
    F.sum(col('revenue')).alias('total_revenue'),
    F.sum(col('quantity')).alias('total_quantity'),
    F.try_divide('total_revenue', 'total_quantity').alias('mean_revenue')
).orderBy('year')

monthly_trends = raw.groupBy('year', 'month').agg(
    F.sum(col('revenue')).alias('total_revenue'),
    F.sum(col('quantity')).alias('total_quantity'),
    F.try_divide('total_revenue', 'total_quantity').alias('mean_revenue')
).orderBy('year', 'month')

mean_check_per_month = raw.groupBy('year', 'month').agg(
    F.mean(col('quantity')).alias('mean_quantity')
).orderBy('year', 'month')

write(yearly_trends, 'yearly_trends')
write(monthly_trends, 'monthly_trends')
write(mean_check_per_month, 'mean_check_per_month')

In [9]:
raw = stores.alias('stores').join(
    sales.alias('sales'),
    col('sales.store_name') == col('stores.name'),
    'inner'
).select(
    col('stores.name').alias('name'),
    col('stores.country').alias('country'),
    col('stores.city').alias('city'),
    col('sales.total_price').alias('total_price'),
    col('sales.quantity').alias('quantity')
)

top5_revenue_per_store = raw.withColumn('revenue', col('total_price') * col('quantity')).groupBy('name').agg(
    F.sum(col('revenue')).alias('total_revenue')
).orderBy(F.desc('total_revenue')).limit(5)

sales_per_countries_and_cities = raw.groupBy('country', 'city').agg(
    F.count(col('name')).alias('total_sales')
).orderBy('country', 'city')

mean_check_by_store = raw.withColumn('revenue', col('total_price') * col('quantity')).groupBy('name').agg(
    F.mean(col('revenue')).alias('mean_revenue')
).orderBy('mean_revenue')

write(top5_revenue_per_store, 'top5_revenue_per_store')
write(sales_per_countries_and_cities, 'sales_per_countries_and_cities')
write(mean_check_by_store, 'mean_check_by_store')

In [10]:
raw = suppliers.alias('suppliers').join(
    sales.alias('sales'),
    col('sales.supplier_name') == col('suppliers.name'),
    'inner'
).select(
    col('suppliers.name').alias('name'),
    col('suppliers.country').alias('country'),
    col('sales.total_price').alias('total_price'),
    col('sales.quantity').alias('quantity')
)

top5_revenue_per_supplier = raw.withColumn('revenue', col('total_price') * col('quantity')).groupBy('name').agg(
    F.sum(col('revenue')).alias('total_revenue')
).orderBy(F.desc('total_revenue')).limit(5)

mean_price_per_supplier = raw.groupBy('name').agg(
    F.mean(col('total_price')).alias('mean_price')
).orderBy('name')

sales_per_countries = raw.groupBy('country').agg(
    F.count(col('name')).alias('total_sales')
).orderBy('country')

write(top5_revenue_per_supplier, 'top5_revenue_per_supplier')
write(mean_price_per_supplier, 'mean_price_per_supplier')
write(sales_per_countries, 'sales_per_countries')

In [11]:
products_with_extreme_ratings = products.select(
    col('id'),
    col('name'),
    col('rating')
).orderBy(F.desc('rating')).limit(10).union(
    products.select(
        col('id'),
        col('name'),
        col('rating')
    ).orderBy('rating').limit(10)
)

correlation = sales.alias('sales').join(
    products.alias('products'),
    col('sales.product_id') == col('products.id'),
    'inner'
).groupBy('products.id').agg(
    F.sum(col('sales.quantity')).alias('total_quantity'),
    F.mean(col('products.rating')).alias('mean_rating')
).stat.corr('total_quantity', 'mean_rating')

correlation_between_quantity_and_rating = spark.createDataFrame(
    [(correlation, )],
    ['correlation']
)

top10_products_per_reviews = products.select(
    col('id'),
    col('name'),
    col('reviews')
).orderBy(F.desc(col('reviews'))).limit(10)

write(products_with_extreme_ratings, 'products_with_extreme_ratings')
write(correlation_between_quantity_and_rating, 'correlation_between_quantity_and_rating')
write(top10_products_per_reviews, 'top10_products_per_reviews')

In [12]:
spark.stop()