In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("MinIO-Spark-ELT") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://host.docker.internal:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()



In [8]:
input_path = "s3a://duolingo/raw/duolingo_batch_1362160679.parquet"
df = spark.read.parquet(input_path)

df.show()

+-------+----------+-----------------+-----------+--------------------+--------------------+-------+--------------+------------+---------------+------------+---------------+
|user_id| timestamp|learning_language|ui_language|           lexeme_id|       lexeme_string|  delta|      p_recall|history_seen|history_correct|session_seen|session_correct|
+-------+----------+-----------------+-----------+--------------------+--------------------+-------+--------------+------------+---------------+------------+---------------+
| u:fowg|1362160674|               en|         es|ca886bc339ea78c58...|orange/orange<n><sg>|    841|           1.0|           4|              3|           2|              2|
| u:duK3|1362160678|               en|         es|a94a938b71c9bd335...|which/which<det><...| 257458|0.666666666667|           5|              4|           3|              2|
| u:duK3|1362160678|               en|         es|7dab41a78fec1de7e...|     hats/hat<n><pl>| 861434|           0.5|           3|  

In [None]:
# Machine Learning Data
ml_df = df.drop('ui_language', 'lexeme_string', 'timestamp')
ml_df =  ml_df.withColumn('accuracy_rate', col('history_correct') / col('history_seen')) \
           .withColumn('session_accuracy', col('session_correct') / col('session_seen')) \
           .withColumn('delta_days', col('delta') / (60 * 60 * 24))

ml_df = ml_df.drop('delta', 'history_seen', 'history_correct', 'session_seen', 'session_correct')

ml_df.show()

+-------+-----------------+--------------------+--------------+------------------+------------------+--------------------+
|user_id|learning_language|           lexeme_id|      p_recall|     accuracy_rate|  session_accuracy|          delta_days|
+-------+-----------------+--------------------+--------------+------------------+------------------+--------------------+
| u:fowg|               en|ca886bc339ea78c58...|           1.0|              0.75|               1.0|0.009733796296296296|
| u:duK3|               en|a94a938b71c9bd335...|0.666666666667|               0.8|0.6666666666666666|   2.979837962962963|
| u:duK3|               en|7dab41a78fec1de7e...|           0.5|               1.0|               0.5|   9.970300925925926|
| u:duK3|               en|9ac50d1a5c3247453...|           1.0|               1.0|               1.0|   6.990069444444444|
| u:duK3|               en|4adf5cd40d521b02a...|           1.0|0.8571428571428571|               1.0|   2.963587962962963|
| u:duK3|       

In [None]:
# Application Database
warehouse_df = df.drop('ui_language')
warehouse_df.show()

+-------+----------+-----------------+--------------------+-------+--------------+------------+---------------+------------+---------------+
|user_id| timestamp|learning_language|           lexeme_id|  delta|      p_recall|history_seen|history_correct|session_seen|session_correct|
+-------+----------+-----------------+--------------------+-------+--------------+------------+---------------+------------+---------------+
| u:fowg|1362160674|               en|ca886bc339ea78c58...|    841|           1.0|           4|              3|           2|              2|
| u:duK3|1362160678|               en|a94a938b71c9bd335...| 257458|0.666666666667|           5|              4|           3|              2|
| u:duK3|1362160678|               en|7dab41a78fec1de7e...| 861434|           0.5|           3|              3|           2|              1|
| u:duK3|1362160678|               en|9ac50d1a5c3247453...| 603942|           1.0|           3|              3|           1|              1|
| u:duK3|1362

In [None]:
# Metabase/Visualization Dataframe
metabase_df = df.drop('ui_language', 'lexeme_id', 'timestamp', 'learning_language', 'delta', 'user_id')

metabase_df = metabase_df.withColumn(
    "error_rate", 
    1 - (F.col("session_correct") / F.col("session_seen"))
)

# Lexeme Summary
metabase_df = metabase_df.groupBy("lexeme_string").agg(
    F.mean("p_recall").alias("avg_recall"),
    F.mean("error_rate").alias("error_rate"),
    F.sum("session_seen").alias("total_seen")
)

metabase_df.show()

+--------------------+--------------+-------------------+----------+
|       lexeme_string|    avg_recall|         error_rate|total_seen|
+--------------------+--------------+-------------------+----------+
|     hats/hat<n><pl>|           0.5|                0.5|         2|
|c'/ce<prn><tn><nt...|           1.0|                0.0|         2|
|avoir/avoir<vblex...|           1.0|                0.0|         4|
|haus/haus<n><nt><...|           1.0|                0.0|         2|
|keine/kein<det><i...|           1.0|                0.0|         3|
|green/green<adj><...|           1.0|                0.0|         1|
|lui/lui<prn><tn><...|           1.0|                0.0|         1|
|panino/panino<n><...|           0.5|                0.5|         2|
|arancia/arancia<n...|           1.0|                0.0|         1|
|frau/frau<n><f><s...|           1.0|                0.0|         1|
|chicken/chicken<n...|           1.0|                0.0|         1|
|interview/intervi...|           1

In [None]:
ml_output_path = "s3a://duolingo-ml/duolingo_transformed.parquet"
visual_output_path = "s3a://duolingo-visual/duolingo_transformed.parquet"
warehouse_output_path = "s3a://duolingo-query/duolingo_transformed.parquet"

ml_df.write.mode("overwrite").parquet(ml_output_path)
metabase_df.write.mode("overwrite").parquet(visual_output_path)
warehouse_df.write.mode("overwrite").parquet(warehouse_output_path)