In [234]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [235]:
!pip install pyspark
!pip install findspark



In [236]:
import findspark
findspark.init()

In [237]:
# import pyspark
# from pyspark.context import SparkContext, SparkConf
# from pyspark.sql.session import SparkSession
# spark = (
#     SparkSession
#     .builder
#     .appName('BIT_01')
#     .config('spark.ui.port', '9311')
#     .config('spark.executor.memoryOverhead', '1G')
#     .config('spark.shuffle.service.enabled', 'true')
#     .config('spark.dynamicAllocation.enabled', 'true')
#     .config('spark.driver.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
#     .config('spark.executor.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
#     .getOrCreate()
# )

In [238]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import broadcast
import pandas as pd
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" # без этой строчки у нас будет возникать постоянное предупреждение с просьбой установить эту переменную в значение 1, что мы заранее и делаем


spark = SparkSession.builder.appName('csv_to_df').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark

## Чтение обоих csv-файлов в датафреймы и посмотр их схем.

In [239]:
customer_df = spark.read.csv('customer.csv',sep=';', header=True, inferSchema=True)
transaction_df = spark.read.csv('transaction.csv',sep=';', header=True, inferSchema=True)

customer_df.printSchema()
transaction_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- DOB: date (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_industry_category: string (nullable = true)
 |-- wealth_segment: string (nullable = true)
 |-- deceased_indicator: string (nullable = true)
 |-- owns_car: string (nullable = true)
 |-- address: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- property_valuation: integer (nullable = true)

root
 |-- transaction_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- online_order: boolean (nullable = true)
 |-- order_status: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- product_line: string (nullable = true)
 |

## Пересохранение данных из csv-файлов в parquet, создание новых датафреймов и посмотр их схем.

In [240]:
customer_df.write.parquet('customer.parquet', mode="overwrite")
transaction_df.write.parquet('transaction.parquet', mode="overwrite")

customer_df_parquet = spark.read.parquet('customer.parquet')
transaction_df_parquet = spark.read.parquet('transaction.parquet')

customer_df_parquet.printSchema()
transaction_df_parquet.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- DOB: date (nullable = true)
 |-- job_title: string (nullable = true)
 |-- job_industry_category: string (nullable = true)
 |-- wealth_segment: string (nullable = true)
 |-- deceased_indicator: string (nullable = true)
 |-- owns_car: string (nullable = true)
 |-- address: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- property_valuation: integer (nullable = true)

root
 |-- transaction_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- online_order: boolean (nullable = true)
 |-- order_status: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- product_line: string (nullable = true)
 |

## Пересохранение данных из csv-файла в parquet, партиционируя по дате транзакции, создание нового датафрейма и посмотр его схемы (актуально только для таблицы с транзакциями)


In [241]:
transaction_df.write.partitionBy('transaction_date').parquet('transaction_partitioned.parquet', mode="overwrite")

transaction_df_partitioned = spark.read.parquet('transaction_partitioned.parquet')
transaction_df_partitioned.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- online_order: boolean (nullable = true)
 |-- order_status: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- product_line: string (nullable = true)
 |-- product_class: string (nullable = true)
 |-- product_size: string (nullable = true)
 |-- list_price: string (nullable = true)
 |-- standard_cost: string (nullable = true)
 |-- transaction_date: string (nullable = true)



## Подсчет количества записей для датафреймов из пунктов 1-2 и сравнение их времени выполнения.


In [242]:
import time

start_time = time.time()
customer_csv_count = customer_df.count()
print("customer CSV count: ", customer_csv_count)
print("customer CSV count time: ", time.time() - start_time)

start_time = time.time()
customer_parquet_count = customer_df_parquet.count()
print("customer Parquet count: ", customer_parquet_count)
print("customer Parquet count time: ", time.time() - start_time)

start_time = time.time()
transaction_csv_count = transaction_df.count()
print("transaction CSV count: ", transaction_csv_count)
print("transaction CSV count time: ", time.time() - start_time)

start_time = time.time()
transaction_parquet_count = transaction_df_parquet.count()
print("transaction Parquet count: ", transaction_parquet_count)
print("transaction Parquet count time: ", time.time() - start_time)

customer CSV count:  4000
customer CSV count time:  0.18865251541137695
customer Parquet count:  4000
customer Parquet count time:  0.2645571231842041
transaction CSV count:  20000
transaction CSV count time:  0.16055512428283691
transaction Parquet count:  20000
transaction Parquet count time:  0.1544053554534912


## Выполнение запросов с таблицами из пункта 1-3 и сравнение время выполнения

In [243]:
# 1. Вывести количество подтвержденных транзакций по каждому клиенту.
def get_confirmed_transactions_count(transaction):
    start = time.time()
    result = transaction \
        .filter(transaction.order_status == "Approved") \
        .groupBy("customer_id") \
        .agg(F.count("transaction_id").alias("transaction_count"))
    return result, time.time() - start

result_1, time_1 = get_confirmed_transactions_count(transaction_df)
result_2, time_2 = get_confirmed_transactions_count(transaction_df_parquet)
result_3, time_3 = get_confirmed_transactions_count(transaction_df_partitioned)

In [244]:
result_1.orderBy('transaction_count', ascending=False).show(5)
print(f'CSV time: {time_1}')
result_2.orderBy('transaction_count', ascending=False).show(5)
print(f'parquet time: {time_2}')
result_3.orderBy('transaction_count', ascending=False).show(5)
print(f'partitioned time: {time_3}')

+-----------+-----------------+
|customer_id|transaction_count|
+-----------+-----------------+
|       2183|               14|
|       1068|               14|
|       2476|               14|
|        377|               13|
|       2464|               13|
+-----------+-----------------+
only showing top 5 rows

CSV time: 0.01901865005493164
+-----------+-----------------+
|customer_id|transaction_count|
+-----------+-----------------+
|       2183|               14|
|       1068|               14|
|       2476|               14|
|        377|               13|
|       2464|               13|
+-----------+-----------------+
only showing top 5 rows

parquet time: 0.016175270080566406
+-----------+-----------------+
|customer_id|transaction_count|
+-----------+-----------------+
|       2183|               14|
|       2476|               14|
|       1068|               14|
|       1140|               13|
|       1946|               13|
+-----------+-----------------+
only showing top 5 ro

In [245]:
# 2. Вывести распределение транзакций по месяцам и сферам деятельности.
def get_transactions_distribution(customers_df, transactions_df):
    start = time.time()
    transactions_with_customer_info = transaction_df.join(customer_df, on="customer_id")
    result = transactions_with_customer_info.groupBy(F.month(F.to_timestamp(transaction_df.transaction_date,'dd.MM.yyyy')), "job_industry_category").count()
    return result, time.time() - start

result_1, time_1 = get_transactions_distribution(customer_df, transaction_df)
result_2, time_2 = get_transactions_distribution(customer_df_parquet, transaction_df_parquet)

In [246]:
result_1.orderBy('count', ascending=False).show(5)
print(f'CSV time: {time_1}')
result_2.orderBy('count', ascending=False).show(5)
print(f'parquet time: {time_2}')

+-------------------------------------------------+---------------------+-----+
|month(to_timestamp(transaction_date, dd.MM.yyyy))|job_industry_category|count|
+-------------------------------------------------+---------------------+-----+
|                                               10|        Manufacturing|  355|
|                                               12|   Financial Services|  355|
|                                                8|   Financial Services|  352|
|                                               11|        Manufacturing|  351|
|                                                8|        Manufacturing|  350|
+-------------------------------------------------+---------------------+-----+
only showing top 5 rows

CSV time: 0.046240806579589844
+-------------------------------------------------+---------------------+-----+
|month(to_timestamp(transaction_date, dd.MM.yyyy))|job_industry_category|count|
+-------------------------------------------------+-------------

In [247]:
df = pd.DataFrame({
    'Name': ['get_transactions_distribution'],
    'CSV Time': [time_1],
    'Parquet Time': [time_2]
})

In [248]:
def get_transactions_distribution(customers_df, transactions_df):
    start = time.time()
    # Если customers_df достаточно мал, можно использовать broadcast join
    transactions_with_customer_info = transaction_df.join(broadcast(customer_df), on="customer_id")
    result = transactions_with_customer_info.groupBy(F.month(F.to_timestamp(transaction_df.transaction_date,'dd.MM.yyyy')), "job_industry_category").count()
    return result, time.time() - start

result_1, time_1 = get_transactions_distribution(customer_df, transaction_df)
result_2, time_2 = get_transactions_distribution(customer_df_parquet, transaction_df_parquet)

result_1.orderBy('count', ascending=False).show(5)
print(f'CSV time broadcast: {time_1}')
result_2.orderBy('count', ascending=False).show(5)
print(f'parquet time broadcast: {time_2}')

+-------------------------------------------------+---------------------+-----+
|month(to_timestamp(transaction_date, dd.MM.yyyy))|job_industry_category|count|
+-------------------------------------------------+---------------------+-----+
|                                               10|        Manufacturing|  355|
|                                               12|   Financial Services|  355|
|                                                8|   Financial Services|  352|
|                                               11|        Manufacturing|  351|
|                                                8|        Manufacturing|  350|
+-------------------------------------------------+---------------------+-----+
only showing top 5 rows

CSV time broadcast: 0.03452444076538086
+-------------------------------------------------+---------------------+-----+
|month(to_timestamp(transaction_date, dd.MM.yyyy))|job_industry_category|count|
+-------------------------------------------------+----

In [249]:
l = ['get_transactions_distribution_broadcast',time_1, time_2 ]
df.loc[len(df)] = l

In [250]:
# 3. Вывести ФИО клиентов, у которых нет транзакций.
def get_customers_without_transactions(customers_df, transactions_df):
    start = time.time()
    customers_with_transactions_df = transactions_df.select("customer_id").distinct()
    customers_without_transactions_df = customers_df.join(customers_with_transactions_df, on="customer_id", how="leftanti")
    result_df = customers_without_transactions_df.select(F.concat_ws(" ", "first_name", "last_name").alias("full_name"))
    return result_df, time.time() - start

result_1, time_1 = get_customers_without_transactions(customer_df, transaction_df)
result_2, time_2 = get_customers_without_transactions(customer_df_parquet, transaction_df_parquet)

In [251]:
result_1.orderBy('full_name').show(5)
print(f'CSV time: {time_1}')
result_2.orderBy('full_name').show(5)
print(f'parquet time: {time_2}')

+---------------+
|      full_name|
+---------------+
|Abby Brownstein|
|    Addia Abels|
|   Adelice Tams|
|   Adler Dredge|
|    Adler Teale|
+---------------+
only showing top 5 rows

CSV time: 0.029563426971435547
+---------------+
|      full_name|
+---------------+
|Abby Brownstein|
|    Addia Abels|
|   Adelice Tams|
|   Adler Dredge|
|    Adler Teale|
+---------------+
only showing top 5 rows

parquet time: 0.024095773696899414


In [252]:
l = ['get_customers_without_transactions',time_1, time_2 ]
df.loc[len(df)] = l

In [253]:
def get_customers_without_transactions(customers_df, transactions_df):
    start = time.time()
    customers_with_transactions_df = transactions_df.select("customer_id").distinct()
    # Если customers_with_transactions_df достаточно мал, можно использовать broadcast join
    customers_without_transactions_df = customers_df.join(broadcast(customers_with_transactions_df), on="customer_id", how="leftanti")
    result_df = customers_without_transactions_df.select(F.concat_ws(" ", "first_name", "last_name").alias("full_name"))
    return result_df, time.time() - start

result_1, time_1 = get_customers_without_transactions(customer_df, transaction_df)
result_2, time_2 = get_customers_without_transactions(customer_df_parquet, transaction_df_parquet)

result_1.orderBy('full_name').show(5)
print(f'CSV time broadcast: {time_1}')
result_2.orderBy('full_name').show(5)
print(f'parquet time broadcast: {time_2}')

+---------------+
|      full_name|
+---------------+
|Abby Brownstein|
|    Addia Abels|
|   Adelice Tams|
|   Adler Dredge|
|    Adler Teale|
+---------------+
only showing top 5 rows

CSV time broadcast: 0.024186134338378906
+---------------+
|      full_name|
+---------------+
|Abby Brownstein|
|    Addia Abels|
|   Adelice Tams|
|   Adler Dredge|
|    Adler Teale|
+---------------+
only showing top 5 rows

parquet time broadcast: 0.019626855850219727


In [254]:
l = ['get_customers_without_transactions_broadcast',time_1, time_2 ]
df.loc[len(df)] = l

In [255]:
# 4. Найти ФИО клиентов с минимальной/максимальной суммой транзакций (sum(list_price)) за весь период (сумма транзакций не может быть null). Вывод оформить в виде json
def get_max_transaction_sum(customers_df, transactions_df):
    start = time.time()
    transactions = transaction_df.withColumn('list_price', F.col('list_price').cast('double'))
    customers = customer_df.withColumn('full_name', F.concat(F.col('first_name'),F.lit(' '), F.col('last_name')).alias('full_name'))
    customers = customers.join(transactions, customers.customer_id == transactions.customer_id,'left').filter(F.col('list_price').isNotNull()).filter(F.col('full_name').isNotNull()).groupBy('full_name').agg(F.sum("list_price").alias('total_price'))
    max_customer = customers.orderBy('total_price', ascending=False).limit(1).toJSON().collect()[0]
    min_customer = customers.orderBy('total_price').limit(1).toJSON().collect()[0]
    return [max_customer, min_customer], time.time() - start

result_1, time_1 = get_max_transaction_sum(customer_df, transaction_df)
result_2, time_2 = get_max_transaction_sum(customer_df_parquet, transaction_df_parquet)

In [256]:
print(result_1)
print(f'CSV time: {time_1}')
print(result_2)
print(f'parquet time: {time_2}')

['{"full_name":"Barrett Lindley","total_price":3620.0}', '{"full_name":"Nerissa Foote","total_price":850.0}']
CSV time: 1.2714004516601562
['{"full_name":"Barrett Lindley","total_price":3620.0}', '{"full_name":"Nerissa Foote","total_price":850.0}']
parquet time: 0.9906737804412842


In [257]:
l = ['get_max_transaction_sum',time_1, time_2 ]
df.loc[len(df)] = l

In [258]:
def get_max_transaction_sum(customers_df, transactions_df):
    start = time.time()
    transactions = transaction_df.withColumn('list_price', F.col('list_price').cast('double'))
    customers = customer_df.withColumn('full_name', F.concat(F.col('first_name'),F.lit(' '), F.col('last_name')).alias('full_name'))
    # Если customers_df достаточно мал, можно использовать broadcast join
    customers = customers.join(broadcast(transactions), customers.customer_id == transactions.customer_id,'left').filter(F.col('list_price').isNotNull()).filter(F.col('full_name').isNotNull()).groupBy('full_name').agg(F.sum("list_price").alias('total_price'))
    max_customer = customers.orderBy('total_price', ascending=False).limit(1).toJSON().collect()[0]
    min_customer = customers.orderBy('total_price').limit(1).toJSON().collect()[0]
    return [max_customer, min_customer], time.time() - start

result_1, time_1 = get_max_transaction_sum(customer_df, transaction_df)
result_2, time_2 = get_max_transaction_sum(customer_df_parquet, transaction_df_parquet)

print(result_1)
print(f'CSV time broadcast: {time_1}')
print(result_2)
print(f'parquet time broadcast: {time_2}')

['{"full_name":"Barrett Lindley","total_price":3620.0}', '{"full_name":"Nerissa Foote","total_price":850.0}']
CSV time broadcast: 1.215149164199829
['{"full_name":"Barrett Lindley","total_price":3620.0}', '{"full_name":"Nerissa Foote","total_price":850.0}']
parquet time broadcast: 1.5200464725494385


In [259]:
l = ['get_max_transaction_sum_broadcast',time_1, time_2 ]
df.loc[len(df)] = l

In [260]:
# 5. Вывести ФИО клиентов, между соседними транзакциями которых был максимальный интервал (интервал вычисляется в днях). Вывод оформить в виде json
def max_interval_between_transactions(customer_df, transaction_df):
    start = time.time()
    transaction_df = transaction_df.withColumn("transaction_date", F.to_date(F.col("transaction_date"), "dd.MM.yyyy"))
    prev_transaction_df = transaction_df.orderBy("customer_id", "transaction_date")
    prev_transaction_df = prev_transaction_df.withColumn("prev_transaction_date", F.lag("transaction_date").over(Window.partitionBy("customer_id").orderBy("transaction_date")))
    transaction_df = transaction_df.join(prev_transaction_df, ["customer_id", "transaction_date"], "left")
    transaction_df = transaction_df.withColumn("interval", F.datediff("transaction_date", "prev_transaction_date"))
    max_interval_df = transaction_df.orderBy(F.desc("interval")).limit(1)
    result_df = max_interval_df.join(customer_df, ["customer_id"], "left")
    result_df = result_df.select(F.col("first_name"), F.col("last_name"), F.col("interval"))
    result_json = result_df.toJSON().collect()
    return result_json, time.time() - start

result_1, time_1 = max_interval_between_transactions(customer_df, transaction_df)
result_2, time_2 = max_interval_between_transactions(customer_df_parquet, transaction_df_parquet)

In [261]:
print(result_1)
print(f'CSV time: {time_1}')
print(result_2)
print(f'parquet time: {time_2}')

['{"first_name":"Susanetta","interval":357}']
CSV time: 2.182856321334839
['{"first_name":"Susanetta","interval":357}']
parquet time: 0.8392555713653564


In [262]:
l = ['max_interval_between_transactions',time_1, time_2 ]
df.loc[len(df)] = l

In [263]:
def max_interval_between_transactions(customer_df, transaction_df):
    start = time.time()
    transaction_df = transaction_df.withColumn("transaction_date", F.to_date(F.col("transaction_date"), "dd.MM.yyyy"))
    prev_transaction_df = transaction_df.orderBy("customer_id", "transaction_date")
    prev_transaction_df = prev_transaction_df.withColumn("prev_transaction_date", F.lag("transaction_date").over(Window.partitionBy("customer_id").orderBy("transaction_date")))
    transaction_df = transaction_df.join(prev_transaction_df, ["customer_id", "transaction_date"], "left")
    transaction_df = transaction_df.withColumn("interval", F.datediff("transaction_date", "prev_transaction_date"))
    max_interval_df = transaction_df.orderBy(F.desc("interval")).limit(1)
    # Если customer_df достаточно мал, можно использовать broadcast join
    result_df = max_interval_df.join(broadcast(customer_df), ["customer_id"], "left")
    result_df = result_df.select(F.col("first_name"), F.col("last_name"), F.col("interval"))
    result_json = result_df.toJSON().collect()
    return result_json, time.time() - start

result_1, time_1 = max_interval_between_transactions(customer_df, transaction_df)
result_2, time_2 = max_interval_between_transactions(customer_df_parquet, transaction_df_parquet)

print(result_1)
print(f'CSV time broadcast: {time_1}')
print(result_2)
print(f'parquet time broadcast: {time_2}')

['{"first_name":"Susanetta","interval":357}']
CSV time broadcast: 1.1458697319030762
['{"first_name":"Susanetta","interval":357}']
parquet time broadcast: 0.8323578834533691


In [264]:
l = ['max_interval_between_transactions_broadcast',time_1, time_2 ]
df.loc[len(df)] = l

In [265]:
df

Unnamed: 0,Name,CSV Time,Parquet Time
0,get_transactions_distribution,0.046241,0.028032
1,get_transactions_distribution_broadcast,0.034524,0.022297
2,get_customers_without_transactions,0.029563,0.024096
3,get_customers_without_transactions_broadcast,0.024186,0.019627
4,get_max_transaction_sum,1.2714,0.990674
5,get_max_transaction_sum_broadcast,1.215149,1.520046
6,max_interval_between_transactions,2.182856,0.839256
7,max_interval_between_transactions_broadcast,1.14587,0.832358


## Как можно видеть, broadcast и правда все ускоряет

