##### General Setup


In [153]:
# Finding Spark Locally
import findspark

path_to_spark='/usr/local/spark/spark-3.3.0-bin-hadoop3/'
findspark.init(path_to_spark)

from pyspark.sql import SparkSession, Row, SQLContext, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType
import pyspark.sql.functions as F
from sqlalchemy import create_engine
# from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey
# from sqlalchemy.types import Boolean, Date, DateTime, Float, Integer, Text, Time, Interval
import sqlalchemy.types as T
import pandas as pd
import pymysql
import json

spark = SparkSession.builder \
    .config("spark.jars", "drivers/mysql-connector-java-8.0.29.jar") \
    .master("local") \
    .appName("aktv-etl") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Get configuration of this session
# spark.sparkContext.getConf().getAll()

In [154]:
def create_dataframe(path: str, table_name: str) -> DataFrame:
    df = spark.read.csv(path, header=True, inferSchema=True,
                                 quote='"', escape='"', multiLine=True)
    df.createOrReplaceTempView(table_name)
    return df


def export_to_rdbms(table_name: str, spark_dataframe: DataFrame , column_schema: dict=None, if_exists: str='replace') -> None:
    pd = spark_dataframe.select("*").toPandas()
    # To Remove
    # print(pd.info())


    # Config MySQL
    user='root'
    port='13306'
    host='127.0.0.1'
    user='root'
    password='pwd123'
    database='aktv_dw'

    sql_engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{database}', pool_recycle=3600)
    db_connection = sql_engine.connect()

    try:
        frame = pd.to_sql(table_name, db_connection, if_exists=if_exists, index=False, dtype=column_schema)
    except ValueError as vx:
        print(vx)
    except Exception as ex:   
        print(ex)
    else:
        print(f"Table {table_name} created/updated successfully.")
    finally:
        db_connection.close()
        # pd.unpersist()


def generate_schema(data_frame: DataFrame) -> dict:
        SQL_TYPE_MAP = {
        'bool': T.BOOLEAN,
        'string': T.TEXT,
        'int': T.Integer,
        'integer': T.Integer,
        'float': T.FLOAT,
        'datetime': T.DATETIME,
        'date': T.DATE,
        'time': T.TIME,
        'timedelta': T.Interval,
        'timestamp': T.DateTime
        }

        columns_schema = {}

        for field in data_frame.dtypes:
                # print(field)
                columns_schema[str(field[0])] = str(SQL_TYPE_MAP[field[1]])
        
        return columns_schema


In [155]:
# Table [callbacks]
TABLE_NAME = 'callbacks'
TABLE_KIND = 'fact'
df = create_dataframe(f"landing_files/{TABLE_NAME}.csv", TABLE_NAME)

df = df.withColumn("data_maptype", F.from_json(df.data, MapType(StringType(),StringType())))

df = df.select(F.col("created_at").alias("created_at_original"), \
        F.explode(df.data_maptype))

df = df.groupBy("created_at_original").pivot("key").agg(F.first("value"))

df = df.select(F.col("created_at_original"), \
        F.col("created_at"), \
        F.col("channel"), \
        F.col("channel_id"), \
        F.col("channel_type"), \
        F.col("cid"), \
        F.col("delete_conversation"), \
        F.col("delete_conversation_channels"), \
        F.col("delete_messages"), \
        F.col("delete_user"), \
        F.col("details"), \
        F.col("hard_delete"), \
        F.col("mark_messages_deleted"), \
        F.col("member"), \
        F.col("members"), \
        F.col("message"), \
        F.col("message_id"), \
        F.col("reaction"), \
        F.col("team"), \
        F.col("total_flags"), \
        F.col("type"), \
        F.col("user"), \
        F.col("watcher_count"), \
        F.get_json_object(F.col("user"), "$.id").alias("user_id")) \
        .withColumn("team_id", F.regexp_extract("team", "\\d+", 0)) \

df = df.withColumn("team_id", df["team_id"].cast(IntegerType()))
df = df.withColumn("user_id", df["user_id"].cast(IntegerType()))

# Exporting to MySQL
export_to_rdbms(f'{TABLE_KIND}_{TABLE_NAME}', df, if_exists='replace')

Table fact_callbacks created/updated successfully.


In [156]:
# Table [users]
TABLE_NAME = 'users'
TABLE_KIND = 'dim'
df = create_dataframe(f"landing_files/{TABLE_NAME}.csv", TABLE_NAME)

column_name = 'other_sources'
df = df.withColumn(column_name,F.expr(f"substring({column_name}, 2, length({column_name})-2)"))

df = df.select(F.col("user_id"), \
    F.col("user_id"), \
    F.col("_fivetran_deleted"), \
    F.col("_fivetran_synced"), \
    F.col("confirmed_at"), \
    F.col("email"), \
    F.col("hashed_password"), \
    F.col("inserted_at"), \
    F.col("other_sources"), \
    F.col("phone_number"), \
    F.col("updated_at"), \
    F.get_json_object(F.col("other_sources"), "$.id").alias("id"), \
    F.get_json_object(F.col("other_sources"), "$.source").alias("source"), \
    F.get_json_object(F.col("other_sources"), "$.table_id").alias("table_id"), \
    F.get_json_object(F.col("other_sources"), "$.table_name").alias("table_name")) \
    # .show(1, truncate=False, vertical=True)

export_to_rdbms(f'{TABLE_KIND}_{TABLE_NAME}', df, if_exists='replace')


                                                                                

Table dim_users created/updated successfully.


In [157]:
# Table [teams]
TABLE_NAME = 'teams'
TABLE_KIND = 'dim'
df = create_dataframe(f"landing_files/{TABLE_NAME}.csv", TABLE_NAME)

column_name = 'other_sources'
df = df.withColumn(column_name,F.expr(f"substring({column_name}, 2, length({column_name})-2)"))

df = df.select(F.col("team_id"), \
    F.col("_fivetran_deleted"), \
    F.col("_fivetran_synced"), \
    F.col("activity_id"), \
    F.col("collective_name"), \
    F.col("gender"), \
    F.col("inserted_at"), \
    F.col("name"), \
    F.col("other_sources"), \
    F.col("season_id"), \
    F.col("team_level"), \
    F.col("updated_at"), \
    F.col("registration_code"), \
    F.col("school_activity_id"), \
    F.col("status"), \
    F.get_json_object(F.col("other_sources"), "$.id").alias("id"), \
    F.get_json_object(F.col("other_sources"), "$.source").alias("source"), \
    F.get_json_object(F.col("other_sources"), "$.table_id").alias("table_id"), \
    F.get_json_object(F.col("other_sources"), "$.table_name").alias("table_name")) \
    # .show(1, truncate=False, vertical=True)

export_to_rdbms(f'{TABLE_KIND}_{TABLE_NAME}', df, if_exists='replace')


Table dim_teams created/updated successfully.


In [158]:
# Table [push_tokens]
TABLE_NAME = 'push_tokens'
TABLE_KIND = 'fact'
df = create_dataframe(f"landing_files/{TABLE_NAME}.csv", TABLE_NAME)

df = df.select(F.col("push_token_id"), \
    F.col("_fivetran_deleted"), \
    F.col("_fivetran_synced"), \
    F.col("expo_push_token"), \
    F.col("inserted_at"), \
    F.col("updated_at"), \
    F.col("user_id")) \
    .withColumn("user_id", df["user_id"].cast(IntegerType()))
    # .show(1, truncate=False, vertical=True)

export_to_rdbms(f'{TABLE_KIND}_{TABLE_NAME}', df, if_exists='replace')


Table fact_push_tokens created/updated successfully.


In [159]:
spark.stop()

___