In [None]:
from config import views
from spark import createSession

from typing import List, Tuple

from matplotlib import pyplot as plt
from pyspark.sql.dataframe import DataFrame

import pyspark.sql.functions as F
import pyspark.sql.types as T

from IPython.display import display

In [None]:
def get_columns_of_type(data_frame: DataFrame, type: str) -> List[str]:
    return [column[0] for column in data_frame.dtypes if column[1] == type]

In [None]:
LENGTH = 80
def show_table_name(table: str) -> None:
    print('=' * LENGTH)
    print(' ' * ((LENGTH - len(table)) // 2), table.upper())
    print('=' * LENGTH)

def show_column_name(column: str) -> None:
    print(column.upper())

In [None]:
VERSION = 'v2'

views = views(VERSION)
spark = createSession()

for view, file in views.items():
    df = spark.read.json(file)
    for column in get_columns_of_type(df, 'boolean'):
        df = df.withColumn(column, F.col(column).cast(T.IntegerType()))
    
    for column in df.columns:
        if column in ['timestamp', 'release_date']:
            df = df.withColumn(f'{column}_s', F.unix_timestamp(column, "yyyy[-MM-dd['T'HH:mm:ss[.SSSSSS]]]"))

    df.createOrReplaceTempView(view)

In [None]:
DATA_FRAMES = list(zip(views.keys(), [spark.sql(f"SELECT * FROM {view}") for view in views.keys()]))

In [None]:
for view, df in DATA_FRAMES:
    show_table_name(view)
    for column, type in df.dtypes:
        print(column.upper(), '-', type)
    
    try:
        dfp = df.limit(100_000).toPandas()
        display(dfp)
    except Exception as e:
        df.show()
        print(df.count(), 'rows')

In [None]:
for view, data_frame in DATA_FRAMES:
    show_table_name(view)
    for column, type in data_frame.dtypes:
        show_column_name(column)
        group_by_column = f"""--sql
            SELECT 
                {column},
                COUNT(*) AS length
            FROM {view}
            GROUP BY {column}
            ORDER BY {column} IS NULL DESC, length DESC, {column} NULLS FIRST
        """
        df = spark.sql(group_by_column)
        display(df.limit(100_000).toPandas())

        count_distinct = f"""--sql
            SELECT
                COUNT(DISTINCT {column})
            FROM {view}
        """
        df = spark.sql(count_distinct)
        display(df.toPandas())

In [None]:
def aggregate_numeric_column(view: str, column: str) -> str:
    return f"""--sql
            SELECT
                "{column}" AS name,
                COUNT({column}) AS count,
                MIN({column}) AS min,
                MAX({column}) AS max,
                AVG({column}) AS average,
                SUM({column}) AS sum,
                SUM(DISTINCT {column}) AS sum_distinct,
                KURTOSIS({column}) AS kurtosis,
                SKEWNESS({column}) AS skewness,
                STDDEV({column}) AS standard_deviation,
                STDDEV_POP({column}) AS population_standard_deviation,
                VARIANCE({column}) AS variance,
                VAR_POP({column}) AS population_variance
            FROM {view}
            WHERE {column} IS NOT NULL
        """

for view, data_frame in DATA_FRAMES:
    show_table_name(view)
    for column, type in data_frame.dtypes:
        if type in ['double', 'bigint']:
            show_column_name(column)
            df = spark.sql(aggregate_numeric_column(view, column))
            display(df.toPandas())

            dfp = spark.sql(f"SELECT {column} FROM {view}").toPandas()
            dfp.hist(bins=50)
            plt.show()

In [None]:
def explode_column(view: str, column: str) -> str:
    return f"""--sql
            SELECT
                DISTINCT EXPLODE({column}) AS distinct_{column}
            FROM {view}
            ORDER BY distinct_{column} NULLS FIRST
        """


def count_exploded_column(view: str, column: str) -> str:
    exploded = f"""--sql
        SELECT
            DISTINCT EXPLODE({column}) AS {column}
        FROM {view}
    """

    return f"""--sql
            SELECT
                COUNT(*) AS length
            FROM ({exploded})
        """

for view, data_frame in DATA_FRAMES:
    show_table_name(view)
    for column, type in data_frame.dtypes:
        if type.startswith('array'):
            show_column_name(column)
            df = spark.sql(explode_column(view, column))
            display(df.toPandas())
            df = spark.sql(count_exploded_column(view, column))
            display(df.toPandas())

In [None]:
JOINS = {
    ('artists', 'tracks') : ('id', 'id_artist'),
    ('tracks', 'track_storage') : ('id', 'track_id'),
    ('tracks', 'sessions') : ('id', 'track_id'),
    ('users', 'sessions') : ('user_id', 'user_id'),
}

In [None]:
def count_everything(table: str) -> str:
    return f"""--sql
        SELECT
            COUNT(*) AS length_{table}
        FROM {table}
    """

def count_joined(tables: Tuple[str, str], ids: Tuple[str, str]) -> str:
    return f"""--sql
        SELECT
            COUNT(*) AS length_{tables[0]}_{tables[1]}
        FROM {tables[0]} AS first
        INNER JOIN {tables[1]} AS second ON first.{ids[0]} == second.{ids[1]}
    """

def count_joined_distinct(tables: Tuple[str, str], ids: Tuple[str, str]) -> str:
    return f"""--sql
        SELECT
            COUNT(DISTINCT first.{ids[0]}) AS length_{tables[0]}_{tables[1]}_distinct
        FROM {tables[0]} AS first
        INNER JOIN {tables[1]} AS second ON first.{ids[0]} == second.{ids[1]}
    """

for tables, ids in JOINS.items():
    print(tables[0].upper(), '-', tables[1].upper())
    df = spark.sql(count_everything(tables[0]))
    display(df.toPandas())
    df = spark.sql(count_everything(tables[1]))
    display(df.toPandas())
    df = spark.sql(count_joined(tables, ids))
    display(df.toPandas())
    df = spark.sql(count_joined_distinct(tables, ids))
    display(df.toPandas())

In [None]:
def select_unknown(tables: Tuple[str, str], ids: Tuple[str, str]) -> str:
    spark.sql(f'SELECT DISTINCT {ids[1]} AS id FROM {tables[1]}') \
        .createOrReplaceTempView('temporary')

    return f"""--sql
        SELECT
            *
        FROM {tables[0]}
        WHERE {ids[0]} NOT IN (SELECT id FROM temporary)
    """

for tables, ids in JOINS.items():
    print(tables[0].upper(), '-', tables[1].upper())
    df = spark.sql(select_unknown(tables, ids))
    display(df.toPandas())
    df = spark.sql(select_unknown(tables[::-1], ids[::-1]))
    display(df.toPandas())

In [None]:
premium_user_comparison = f"""--sql
    SELECT
        COUNT_IF(premium_user == 1) AS premium_users,
        COUNT_IF(premium_user == 0) AS non_premium_users,
        COUNT_IF(premium_user == 0) / COUNT(*) * 100 AS non_premium_users_percentage,
        COUNT_IF(premium_user == 1) / COUNT(*) * 100 AS premium_users_percentage
    FROM users
"""
df = spark.sql(premium_user_comparison)
display(df.toPandas())