If you want to run pyspark from jupyter notebook you should run next command

In [None]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("YourAppName") \
#     .config("spark.driver.extraClassPath", "/path/to/postgresql-42.6.0.jar") \
#     .config("spark.executor.extraClassPath", "/path/to/postgresql-42.6.0.jar") \
#     .config("spark.jars", "/path/to/postgresql-42.6.0.jar") \
#     .config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC") \
#     .config("spark.executor.extraJavaOptions", "-Duser.timezone=UTC") \
#     .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
#     .getOrCreate()

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql.functions import col, from_unixtime, date_format, when, round, lit, lag, unix_timestamp, concat, monotonically_increasing_id

In [2]:
events_file_path = 'events.jsonl'

In [3]:
check_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("event_timestamp", IntegerType(), True),
    StructField("event_type", StringType(), True),
])

In [None]:
check_df = spark.read.schema(check_schema).json(events_file_path)

Checking are all event ids unique in dataset

In [None]:
unique_event_ids_count = check_df.select("event_id").distinct().count()
if unique_event_ids_count == check_df.count():
    print("All event_id values are unique.")
else:
    print("Some event_id values are not unique.")

Checking are all event types registration, transaction, login, logout

In [None]:
expected_event_types = {"registration", "transaction", "login", "logout"}
distinct_event_types = set(check_df.select("event_type").distinct().rdd.flatMap(lambda x: x).collect())
if distinct_event_types.issubset(expected_event_types):
    print("All event_type values are within the expected set.")
else:
    print("Some event_type values are not within the expected set.")

In [None]:
check_df.show()

Goal is to create 3 tables, one for registration which will contain information about users, second is transaction which will contain informations about transactions and third is session, which will contain information about login and logout events

In [8]:
registration_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("event_timestamp", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_data", StructType([
        StructField("country", StringType(), True),
        StructField("name", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("device_os", StringType(), True),
        StructField("marketing_campaign", StringType(), True),
    ]), True)
])

In [9]:
transaction_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("event_timestamp", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_data", StructType([
        StructField("user_id", StringType(), True),
        StructField("transaction_currency", StringType(), True),
        StructField("transaction_amount", FloatType(), True),
    ]), True)
])

In [10]:
session_schema = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("event_timestamp", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("event_data", StructType([
        StructField("user_id", StringType(), True),
    ]), True)
])

For each table schema is created so when is loaded from original dataset it only contains informations relevant for that table

In [None]:
registration_df = spark.read.schema(registration_schema).json(events_file_path)
registration_df = registration_df.filter(registration_df.event_type == "registration")

In [None]:
transaction_df = spark.read.schema(transaction_schema).json(events_file_path)
transaction_df = transaction_df.filter(transaction_df.event_type == "transaction")

In [None]:
session_df = spark.read.schema(session_schema).json(events_file_path)
session_df = session_df.filter(col("event_type").isin("login", "logout"))

Registration table contains columns: user_id which is also primary key for this table, date of registration, time of registration, name of user, country of user, device_os and marketing_campaign which are not relevant for this challenge but are left in table

In [17]:
new_registration_df = (
    registration_df
    .withColumn("date", date_format(from_unixtime(col("event_timestamp")), "yyyy-MM-dd"))
    .withColumn("time", date_format(from_unixtime(col("event_timestamp")), "HH:mm:ss"))
    .select(
        col("event_data.user_id").alias("user_id"),
        col("date"),
        col("time"),
        col("event_data.name").alias("name"),
        col("event_data.country").alias("country"),
        col("event_data.device_os").alias("device_os"),
        col("event_data.marketing_campaign").alias("marketing_campaign")
    )
)

new_registration_df = new_registration_df.withColumn(
    "marketing_campaign",
    when(col("marketing_campaign") == '', None).otherwise(col("marketing_campaign"))
)
new_registration_df = new_registration_df.withColumn("date", col("date").cast(DateType()))
new_registration_df = new_registration_df.withColumn("time", col("time").cast(TimestampType()))
new_registration_df.show(truncate=False)

+------------------------------------+----------+-------------------+--------------------------+-------+---------+------------------------+
|user_id                             |date      |time               |name                      |country|device_os|marketing_campaign      |
+------------------------------------+----------+-------------------+--------------------------+-------+---------+------------------------+
|63faa66c-683b-11ee-aca7-8699b86be788|2010-05-18|2023-11-18 00:12:07|Lope Cañellas             |ES     |Android  |influencer_marketing    |
|846202ce-683b-11ee-aca7-8699b86be788|2010-05-22|2023-11-18 13:27:51|Dr. Ulrich Hauffer        |DE     |iOS      |NULL                    |
|6422113e-683b-11ee-aca7-8699b86be788|2010-05-18|2023-11-18 23:49:44|Mr. Daniel Taylor         |US     |Android  |NULL                    |
|6fc81560-683b-11ee-aca7-8699b86be788|2010-05-20|2023-11-18 07:50:38|Horst Meyer               |DE     |iOS      |social_media_advertising|
|65494f0a-683b-11ee-

23/11/18 16:28:44 INFO FileSourceStrategy: Pushed Filters: IsNotNull(event_type),EqualTo(event_type,registration)
23/11/18 16:28:44 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(event_type#43),(event_type#43 = registration)
23/11/18 16:28:44 INFO CodeGenerator: Code generated in 15.371391 ms
23/11/18 16:28:44 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 355.5 KiB, free 364.3 MiB)
23/11/18 16:28:44 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 364.3 MiB)
23/11/18 16:28:44 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 172.26.138.14:44159 (size: 35.1 KiB, free: 366.1 MiB)
23/11/18 16:28:44 INFO SparkContext: Created broadcast 20 from showString at NativeMethodAccessorImpl.java:0
23/11/18 16:28:44 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/18 16:28:44 INFO SparkContext: Starting job:

Transactions table contains columns: user_id which is foreign_key connected with table Registration, transaction_currency which is USD for every row, transaction_amount (other currency that were present in original dataset is EUR but everyting is converted to USD), data of transaction, time of transaction and transaction_id which is added so that it can be primary key for this table

In [18]:
conversion_rate = 1.3
new_transaction_df = (
    transaction_df
    .select(
        col("event_data.user_id").alias("user_id"),
        lit("USD").alias("transaction_currency"),
        round(
            when(col("event_data.transaction_currency") == "EUR",
                 col("event_data.transaction_amount") * conversion_rate)
            .otherwise(col("event_data.transaction_amount")), 2
        ).alias("transaction_amount"),
        date_format(from_unixtime(col("event_timestamp")), "yyyy-MM-dd").alias("date"),
        date_format(from_unixtime(col("event_timestamp")), "HH:mm:ss").alias("time")
    )
)
# Show the new DataFrame
new_transaction_df = new_transaction_df.withColumn("transaction_id", monotonically_increasing_id())
new_transaction_df = new_transaction_df.withColumn("date", col("date").cast(DateType()))
new_transaction_df = new_transaction_df.withColumn("time", col("time").cast(TimestampType()))
new_transaction_df.show(truncate=False)

+------------------------------------+--------------------+------------------+----------+-------------------+--------------+
|user_id                             |transaction_currency|transaction_amount|date      |time               |transaction_id|
+------------------------------------+--------------------+------------------+----------+-------------------+--------------+
|81cebb24-683b-11ee-aca7-8699b86be788|USD                 |1.99              |2010-05-22|2023-11-18 23:55:19|0             |
|734ed624-683b-11ee-aca7-8699b86be788|USD                 |0.99              |2010-05-22|2023-11-18 23:25:10|1             |
|6017708e-683b-11ee-aca7-8699b86be788|USD                 |2.99              |2010-05-22|2023-11-18 23:23:46|2             |
|74b67418-683b-11ee-aca7-8699b86be788|USD                 |0.99              |2010-05-21|2023-11-18 23:58:40|3             |
|603a31fa-683b-11ee-aca7-8699b86be788|USD                 |1.29              |2010-05-17|2023-11-18 07:10:10|4             |


23/11/18 16:28:44 INFO FileSourceStrategy: Pushed Filters: IsNotNull(event_type),EqualTo(event_type,transaction)
23/11/18 16:28:44 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(event_type#52),(event_type#52 = transaction)
23/11/18 16:28:44 INFO CodeGenerator: Code generated in 19.721691 ms
23/11/18 16:28:44 INFO MemoryStore: Block broadcast_22 stored as values in memory (estimated size 355.5 KiB, free 363.9 MiB)
23/11/18 16:28:44 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 363.9 MiB)
23/11/18 16:28:44 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 172.26.138.14:44159 (size: 35.1 KiB, free: 366.1 MiB)
23/11/18 16:28:44 INFO SparkContext: Created broadcast 22 from showString at NativeMethodAccessorImpl.java:0
23/11/18 16:28:44 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/18 16:28:44 INFO SparkContext: Starting job: s

Table Session is created in a way so that each row represents one session by having login date and time, and logout date and time, and duration in seconds which represents number of seconds between login and logout. In original dataset it was noticed that for same user there are multiple login or logout events in a row, which doesn't make sense, so only first event of login or logout for that array of multiple same events are kept. Also column named session_id is added so that it can be primary key for this table, and user_id is foreign key connected with table Registration.

In [None]:
new_transaction_df.count()
from pyspark.sql.functions import min, max
print(new_transaction_df.select(max('transaction_amount')).collect())
new_transaction_df.select(min('transaction_amount')).collect()

In [19]:
new_session_df = (
    session_df
    .select(
        col("event_data.user_id").alias("user_id"),
        col("event_type"),
        date_format(from_unixtime(col("event_timestamp")), "yyyy-MM-dd").alias("date"),
        date_format(from_unixtime(col("event_timestamp")), "HH:mm:ss").alias("time")
    )
    .orderBy("user_id", "date", "time")
)

# Show the new DataFrame
new_session_df.show(truncate=False)

23/11/18 16:28:44 INFO FileSourceStrategy: Pushed Filters: In(event_type, [login,logout])
23/11/18 16:28:44 INFO FileSourceStrategy: Post-Scan Filters: event_type#61 IN (login,logout)
23/11/18 16:28:45 INFO CodeGenerator: Code generated in 5.481222 ms
23/11/18 16:28:45 INFO CodeGenerator: Code generated in 8.895111 ms
23/11/18 16:28:45 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 355.5 KiB, free 363.5 MiB)
23/11/18 16:28:45 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 363.5 MiB)
23/11/18 16:28:45 INFO BlockManagerInfo: Added broadcast_24_piece0 in memory on 172.26.138.14:44159 (size: 35.1 KiB, free: 366.0 MiB)
23/11/18 16:28:45 INFO SparkContext: Created broadcast 24 from showString at NativeMethodAccessorImpl.java:0
23/11/18 16:28:45 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/18 16:28:45 INFO SparkCon

+------------------------------------+----------+----------+--------+
|user_id                             |event_type|date      |time    |
+------------------------------------+----------+----------+--------+
|5c55c572-683b-11ee-aca7-8699b86be788|login     |2010-05-08|14:40:34|
|5c55c572-683b-11ee-aca7-8699b86be788|logout    |2010-05-08|14:41:53|
|5c55c572-683b-11ee-aca7-8699b86be788|login     |2010-05-08|16:50:15|
|5c55c572-683b-11ee-aca7-8699b86be788|logout    |2010-05-08|16:50:45|
|5c55c572-683b-11ee-aca7-8699b86be788|login     |2010-05-08|18:11:35|
|5c55c572-683b-11ee-aca7-8699b86be788|logout    |2010-05-08|18:15:03|
|5c55c572-683b-11ee-aca7-8699b86be788|login     |2010-05-08|19:06:41|
|5c55c572-683b-11ee-aca7-8699b86be788|logout    |2010-05-08|19:10:53|
|5c55c572-683b-11ee-aca7-8699b86be788|login     |2010-05-09|19:02:26|
|5c55c572-683b-11ee-aca7-8699b86be788|logout    |2010-05-09|19:09:43|
|5c55c572-683b-11ee-aca7-8699b86be788|login     |2010-05-09|19:58:59|
|5c55c572-683b-11ee-

23/11/18 16:28:45 INFO Executor: Finished task 1.0 in stage 19.0 (TID 20). 4320 bytes result sent to driver
23/11/18 16:28:45 INFO TaskSetManager: Finished task 1.0 in stage 19.0 (TID 20) in 260 ms on 172.26.138.14 (executor driver) (1/2)
23/11/18 16:28:45 INFO Executor: Finished task 0.0 in stage 19.0 (TID 19). 4320 bytes result sent to driver
23/11/18 16:28:45 INFO TaskSetManager: Finished task 0.0 in stage 19.0 (TID 19) in 284 ms on 172.26.138.14 (executor driver) (2/2)
23/11/18 16:28:45 INFO TaskSchedulerImpl: Removed TaskSet 19.0, whose tasks have all completed, from pool 
23/11/18 16:28:45 INFO DAGScheduler: ResultStage 19 (showString at NativeMethodAccessorImpl.java:0) finished in 0.289 s
23/11/18 16:28:45 INFO DAGScheduler: Job 14 is finished. Cancelling potential speculative or zombie tasks for this job
23/11/18 16:28:45 INFO TaskSchedulerImpl: Killing all running tasks in stage 19: Stage finished
23/11/18 16:28:45 INFO DAGScheduler: Job 14 finished: showString at NativeMethod

In [20]:
session_collect = new_session_df.collect()
result_rows = []
old_user_id = "5c55c572-683b-11ee-aca7-8699b86be788"
old_event_type = ""
old_date = None
old_time = None
counter = 0
for row in session_collect:
    if row.user_id != old_user_id:
        result_rows.append((old_user_id, old_date, old_time, None, None))
    elif row.user_id == old_user_id and row.event_type == "logout" and old_event_type == "login":
        result_rows.append((row.user_id, old_date, old_time, row.date, row.time))
    old_user_id = row.user_id
    old_event_type = row.event_type
    old_date = row.date
    old_time = row.time

23/11/18 16:28:45 INFO FileSourceStrategy: Pushed Filters: In(event_type, [login,logout])
23/11/18 16:28:45 INFO FileSourceStrategy: Post-Scan Filters: event_type#61 IN (login,logout)
23/11/18 16:28:45 INFO MemoryStore: Block broadcast_26 stored as values in memory (estimated size 355.5 KiB, free 363.9 MiB)
23/11/18 16:28:45 INFO MemoryStore: Block broadcast_26_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 363.9 MiB)
23/11/18 16:28:45 INFO BlockManagerInfo: Added broadcast_26_piece0 in memory on 172.26.138.14:44159 (size: 35.1 KiB, free: 366.1 MiB)
23/11/18 16:28:45 INFO SparkContext: Created broadcast 26 from collect at /tmp/ipykernel_533/1496398248.py:1
23/11/18 16:28:45 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/18 16:28:45 INFO CodeGenerator: Code generated in 4.256873 ms
23/11/18 16:28:45 INFO SparkContext: Starting job: collect at /tmp/ipykernel_533/1496398248.py:1
23

In [21]:
len(result_rows)

20458

In [22]:
new_session_df = spark.createDataFrame(result_rows, ["user_id", "login_date", "login_time", "logout_date", "logout_time"])

# Show the new DataFrame
new_session_df.show()

+--------------------+----------+----------+-----------+-----------+
|             user_id|login_date|login_time|logout_date|logout_time|
+--------------------+----------+----------+-----------+-----------+
|5c55c572-683b-11e...|2010-05-08|  14:40:34| 2010-05-08|   14:41:53|
|5c55c572-683b-11e...|2010-05-08|  16:50:15| 2010-05-08|   16:50:45|
|5c55c572-683b-11e...|2010-05-08|  18:11:35| 2010-05-08|   18:15:03|
|5c55c572-683b-11e...|2010-05-08|  19:06:41| 2010-05-08|   19:10:53|
|5c55c572-683b-11e...|2010-05-09|  19:02:26| 2010-05-09|   19:09:43|
|5c55c572-683b-11e...|2010-05-09|  19:58:59| 2010-05-09|   20:07:40|
|5c55c572-683b-11e...|2010-05-10|  21:12:40| 2010-05-10|   21:20:39|
|5c55c572-683b-11e...|2010-05-11|  22:49:32| 2010-05-11|   22:50:04|
|5c55c572-683b-11e...|2010-05-12|  23:46:21| 2010-05-12|   23:47:12|
|5c55c572-683b-11e...|2010-05-12|  23:56:56| 2010-05-13|   00:01:49|
|5c55c572-683b-11e...|2010-05-15|  23:52:24| 2010-05-15|   23:53:20|
|5c55c572-683b-11e...|2010-05-15| 

23/11/18 16:28:47 INFO CodeGenerator: Code generated in 5.723432 ms
23/11/18 16:28:47 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/11/18 16:28:47 INFO DAGScheduler: Got job 18 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/11/18 16:28:47 INFO DAGScheduler: Final stage: ResultStage 24 (showString at NativeMethodAccessorImpl.java:0)
23/11/18 16:28:47 INFO DAGScheduler: Parents of final stage: List()
23/11/18 16:28:47 INFO DAGScheduler: Missing parents: List()
23/11/18 16:28:47 INFO DAGScheduler: Submitting ResultStage 24 (MapPartitionsRDD[78] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/11/18 16:28:47 INFO MemoryStore: Block broadcast_30 stored as values in memory (estimated size 13.7 KiB, free 363.7 MiB)
23/11/18 16:28:47 INFO MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 6.8 KiB, free 363.7 MiB)
23/11/18 16:28:47 INFO BlockManagerInfo: Added broadca

In [23]:
new_session_df = new_session_df.withColumn(
    "duration_seconds",
    (unix_timestamp(concat(col("logout_date"), lit(" "), col("logout_time")), "yyyy-MM-dd HH:mm:ss") -
     unix_timestamp(concat(col("login_date"), lit(" "), col("login_time")), "yyyy-MM-dd HH:mm:ss"))
)
new_session_df = new_session_df.withColumn("session_id", monotonically_increasing_id())
new_session_df = new_session_df.withColumn("login_date", col("login_date").cast(DateType()))
new_session_df = new_session_df.withColumn("logout_date", col("logout_date").cast(DateType()))
new_session_df = new_session_df.withColumn("login_time", col("login_time").cast(TimestampType()))
new_session_df = new_session_df.withColumn("logout_time", col("logout_time").cast(TimestampType()))
# Show the result DataFrame
new_session_df.show(truncate=False)
new_session_df.count()

23/11/18 16:28:47 INFO CodeGenerator: Code generated in 15.77225 ms
23/11/18 16:28:47 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/11/18 16:28:47 INFO DAGScheduler: Got job 19 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/11/18 16:28:47 INFO DAGScheduler: Final stage: ResultStage 25 (showString at NativeMethodAccessorImpl.java:0)
23/11/18 16:28:47 INFO DAGScheduler: Parents of final stage: List()
23/11/18 16:28:47 INFO DAGScheduler: Missing parents: List()
23/11/18 16:28:47 INFO DAGScheduler: Submitting ResultStage 25 (MapPartitionsRDD[80] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
23/11/18 16:28:47 INFO MemoryStore: Block broadcast_31 stored as values in memory (estimated size 20.1 KiB, free 363.7 MiB)
23/11/18 16:28:47 INFO MemoryStore: Block broadcast_31_piece0 stored as bytes in memory (estimated size 9.0 KiB, free 363.7 MiB)
23/11/18 16:28:47 INFO BlockManagerInfo: Added broadca

+------------------------------------+----------+-------------------+-----------+-------------------+----------------+----------+
|user_id                             |login_date|login_time         |logout_date|logout_time        |duration_seconds|session_id|
+------------------------------------+----------+-------------------+-----------+-------------------+----------------+----------+
|5c55c572-683b-11ee-aca7-8699b86be788|2010-05-08|2023-11-18 14:40:34|2010-05-08 |2023-11-18 14:41:53|79              |0         |
|5c55c572-683b-11ee-aca7-8699b86be788|2010-05-08|2023-11-18 16:50:15|2010-05-08 |2023-11-18 16:50:45|30              |1         |
|5c55c572-683b-11ee-aca7-8699b86be788|2010-05-08|2023-11-18 18:11:35|2010-05-08 |2023-11-18 18:15:03|208             |2         |
|5c55c572-683b-11ee-aca7-8699b86be788|2010-05-08|2023-11-18 19:06:41|2010-05-08 |2023-11-18 19:10:53|252             |3         |
|5c55c572-683b-11ee-aca7-8699b86be788|2010-05-09|2023-11-18 19:02:26|2010-05-09 |2023-11-1

23/11/18 16:28:48 INFO BlockManagerInfo: Removed broadcast_30_piece0 on 172.26.138.14:44159 in memory (size: 6.8 KiB, free: 366.0 MiB)
23/11/18 16:28:48 INFO CodeGenerator: Code generated in 16.049492 ms
23/11/18 16:28:48 INFO PythonRunner: Times: total = 183, boot = 4, init = 153, finish = 26
23/11/18 16:28:48 INFO Executor: Finished task 0.0 in stage 26.0 (TID 28). 2277 bytes result sent to driver
23/11/18 16:28:48 INFO TaskSetManager: Finished task 0.0 in stage 26.0 (TID 28) in 218 ms on 172.26.138.14 (executor driver) (1/12)
23/11/18 16:28:48 INFO PythonRunner: Times: total = 214, boot = 4, init = 205, finish = 5
23/11/18 16:28:48 INFO Executor: Finished task 6.0 in stage 26.0 (TID 34). 2277 bytes result sent to driver
23/11/18 16:28:48 INFO TaskSetManager: Finished task 6.0 in stage 26.0 (TID 34) in 263 ms on 172.26.138.14 (executor driver) (2/12)
23/11/18 16:28:48 INFO PythonRunner: Times: total = 193, boot = 4, init = 169, finish = 20
23/11/18 16:28:48 INFO PythonRunner: Times: 

20458

Next block of code is to save this tables as csv files, but those files are not used in later processing.

In [24]:
df_registration = new_registration_df.toPandas()
df_session = new_session_df.toPandas()
df_transaction = new_transaction_df.toPandas()
df_registration.to_csv('registration.csv')
df_session.to_csv('session.csv')
df_transaction.to_csv('transaction.csv')

23/11/18 16:28:48 INFO FileSourceStrategy: Pushed Filters: IsNotNull(event_type),EqualTo(event_type,registration)
23/11/18 16:28:48 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(event_type#43),(event_type#43 = registration)
23/11/18 16:28:48 INFO CodeGenerator: Code generated in 11.376021 ms
23/11/18 16:28:48 INFO MemoryStore: Block broadcast_34 stored as values in memory (estimated size 355.5 KiB, free 363.4 MiB)
23/11/18 16:28:48 INFO MemoryStore: Block broadcast_34_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 363.4 MiB)
23/11/18 16:28:48 INFO BlockManagerInfo: Added broadcast_34_piece0 in memory on 172.26.138.14:44159 (size: 35.1 KiB, free: 366.0 MiB)
23/11/18 16:28:48 INFO SparkContext: Created broadcast 34 from toPandas at /tmp/ipykernel_533/1718442566.py:1
23/11/18 16:28:48 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/18 16:28:48 INFO SparkContext: Starting job

Each of 3 tables are saved in postgres sql. In order to do the same there should be docker instance or some other instance of postgres sql setup with same configuration as written in next line of code.

In [25]:
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "nordeus",
    "driver": "org.postgresql.Driver",
}

new_registration_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "registration") \
    .mode("overwrite") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .option("driver", properties["driver"]) \
    .save()

new_session_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "session") \
    .mode("overwrite") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .option("driver", properties["driver"]) \
    .save()

new_transaction_df.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "transaction") \
    .mode("overwrite") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .option("driver", properties["driver"]) \
    .save()


23/11/18 16:28:50 INFO FileSourceStrategy: Pushed Filters: IsNotNull(event_type),EqualTo(event_type,registration)
23/11/18 16:28:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(event_type#43),(event_type#43 = registration)
23/11/18 16:28:50 INFO MemoryStore: Block broadcast_39 stored as values in memory (estimated size 355.5 KiB, free 362.6 MiB)
23/11/18 16:28:50 INFO MemoryStore: Block broadcast_39_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 362.6 MiB)
23/11/18 16:28:50 INFO BlockManagerInfo: Added broadcast_39_piece0 in memory on 172.26.138.14:44159 (size: 35.1 KiB, free: 365.9 MiB)
23/11/18 16:28:50 INFO SparkContext: Created broadcast 39 from save at NativeMethodAccessorImpl.java:0
23/11/18 16:28:50 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/18 16:28:50 INFO SparkContext: Starting job: save at NativeMethodAccessorImpl.java:0
23/11/18 16:28:50 INFO DAGSchedule