In [36]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('bitso.com').enableHiveSupport().getOrCreate()

In [2]:
# Ingestion
sources = ["deposit", "event", "user_id", "withdrawals"]

for source in sources:
    spark.read.option("header", "true").option("inferSchema", "true").csv(f"data/{source}.csv").createOrReplaceTempView(f'raw_{source}')

                                                                                

In [27]:
def create_view(name, query):
    spark.sql(query).createOrReplaceTempView(name)

In [8]:
# stg_transactions
create_view(
    name='stg_transactions', 
    query="""
    SELECT 
        id,
        event_timestamp,
        user_id,
        amount,
        NULL as interface,
        currency,
        tx_status
    FROM raw_deposit
    UNION ALL
    SELECT 
        id,
        event_timestamp,
        user_id,
        amount,
        interface,
        currency,
        tx_status
    FROM raw_withdrawals
""")

In [4]:
# dim_users
create_view(
    name='dim_users', 
    query="""
    SELECT
        DISTINCT user_id
    FROM
        raw_user_id
""")

In [13]:
# dim_time
create_view(
    name='dim_time',
    query="""
    WITH date_data AS (
        SELECT
            date_trunc('day', event_timestamp) AS date,
            extract(year FROM event_timestamp) AS year,
            extract(month FROM event_timestamp) AS month,
            extract(day FROM event_timestamp) AS day
        FROM
            stg_transactions
    )
    SELECT
        DISTINCT
        date,
        year,
        month,
        day
    FROM
        date_data
    ORDER BY
        date;
""")

In [28]:
# dim_currency
create_view(
    name='dim_currency',
    query="""
    SELECT
        DISTINCT currency
    FROM
        stg_transactions
     """)

In [103]:
# dim_event_name
create_view(
    name='dim_event_name',
    query="""
    SELECT
        DISTINCT event_name
    FROM
        raw_event
     """)

In [87]:
# dim_transaction_type
create_view(
    name='dim_transaction_type',
    query="""
    SELECT
        'deposit' as transaction_type
    UNION ALL
    SELECT
        'withdrawal' as transaction_type
     """)

In [89]:
# fact_transactions
create_view(
    name='fact_transactions',
    query="""
    
    WITH transactions_data AS (
        SELECT
            event_timestamp,
            user_id,
            amount,
            currency,
            tx_status,
            CASE
                WHEN interface IS NOT NULL THEN 'withdrawal'
                ELSE 'deposit'
            END AS transaction_type
        FROM
            stg_transactions
    )
    SELECT
        t.event_timestamp,
        u.user_id,
        d.date,
        c.currency,
        t.amount,
        t.tx_status,
        t.transaction_type
    FROM
        transactions_data t
    LEFT JOIN
        dim_users u ON t.user_id = u.user_id
    LEFT JOIN
        dim_time d ON date_trunc('day', t.event_timestamp) = d.date
    LEFT JOIN
        dim_currency c ON t.currency = c.currency
    LEFT JOIN
        dim_transaction_type tt ON t.transaction_type = tt.transaction_type;
    """)
    

In [106]:
# fact_user_events
create_view(
    name='fact_user_events',
    query="""
    WITH events_data AS (
        SELECT
            e.event_timestamp,
            u.user_id,
            d.date,
            e.event_name
        FROM
            raw_event e
        LEFT JOIN
            dim_users u ON e.user_id = u.user_id
        LEFT JOIN
            dim_time d ON date_trunc('day', e.event_timestamp) = d.date
        LEFT JOIN
            dim_event_name en ON e.event_name = en.event_name
    )
    SELECT
        event_timestamp,
        user_id,
        date,
        event_name
    FROM
        events_data;
""")

In [107]:
# Print schemas and export tables to CSV

tables = spark.catalog.listTables()
for table in tables:
    if 'fact_' in table.name or 'dim_' in table.name:
        print(f'Table: {table.name}')
        spark.table(table.name).printSchema()
        df = spark.sql(f"SELECT * FROM {table.name}")
        df.coalesce(1).write.option("header", "true").mode("overwrite").csv(f"output/{table.name}")
    


Table: dim_currency
root
 |-- currency: string (nullable = true)

Table: dim_event_name
root
 |-- event_name: string (nullable = true)

Table: dim_time
root
 |-- date: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



                                                                                

Table: dim_transaction_type
root
 |-- transaction_type: string (nullable = false)

Table: dim_users
root
 |-- user_id: string (nullable = true)

Table: fact_transactions
root
 |-- event_timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- currency: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- tx_status: string (nullable = true)
 |-- transaction_type: string (nullable = false)



                                                                                

Table: fact_user_events
root
 |-- event_timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- event_name: string (nullable = true)



                                                                                

In [None]:
# How many users were active on a given day (they made a deposit or withdrawal)
spark.sql("""
SELECT
    date,
    COUNT(DISTINCT user_id) AS active_users
FROM
    fact_transactions
WHERE 
    transaction_type in ('deposit', 'withdrawal')
GROUP BY
    date
ORDER BY
    date
""").show()

In [None]:
# Identify users haven't made a deposit
spark.sql("""
SELECT 
    u.user_id
FROM 
    dim_users u
LEFT JOIN 
    fact_transactions t 
ON 
    u.user_id = t.user_id AND t.transaction_type = 'deposit'
WHERE 
    t.user_id IS NULL;
""").show()

In [None]:
# Identify on a given day which users have made more than 5 deposits historically
spark.sql("""
SELECT 
    f.user_id,
    COUNT(f.transaction_type) as total_deposits
FROM 
    fact_transactions f
WHERE
    f.transaction_type = 'deposit'
    and f.date <= CAST('2021-01-01' as DATE)
GROUP BY
    user_id
HAVING
     total_deposits  > 5
""").show()

In [None]:
# When was the last time a user made a login
spark.sql("""
SELECT 
    user_id,
    max(event_timestamp) as last_login
FROM
    fact_user_events
WHERE
    event_name = 'login'
GROUP BY
    user_id
ORDER BY
    last_login
""").show()

In [None]:
# How many times a user has made a login between two dates
spark.sql("""
SELECT
    user_id,
    COUNT(*) AS login_count
FROM
    fact_user_events
WHERE
    event_name = 'login'
    AND date BETWEEN '2020-01-01' AND '2021-01-01'
GROUP BY
    user_id;
""").show()

In [None]:
# Number of unique currencies deposited on a given day 
spark.sql("""
SELECT
    date,
    COUNT(DISTINCT currency) AS unique_currencies
FROM
    fact_transactions
WHERE
    transaction_type = 'deposit'
GROUP BY
    date
ORDER BY
    date;
""").show()

In [None]:
# Number of unique currencies withdrew on a given day
spark.sql("""
SELECT
    date,
    COUNT(DISTINCT currency) AS unique_currencies
FROM
    fact_transactions
WHERE
    transaction_type = 'withdrawal'
GROUP BY
    date
ORDER BY
    date;
""").show()

In [None]:
# Total amount deposited of a given currency on a given day
spark.sql("""
SELECT
    date,
    COUNT(DISTINCT currency) AS unique_currencies
FROM
    fact_transactions
WHERE
    transaction_type = 'withdrawal'
GROUP BY
    date
ORDER BY
    date;
""").show()