In [1]:
import logging
import socket
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, TimestampType, StructType, StructField

In [2]:
minio_socket = socket.gethostbyname('minio')


IN_PATH_RAW_DATA = 's3a://kueski-store/raws/rating.csv'
OUT_PATH_FEATURES_ALL_ROWS = 's3a://kueski-store/features/allrows/'
OUT_PATH_FEATURES_ONLINE_STORE = 's3a://kueski-store/features/onlinestore/'

In [3]:
spark = SparkSession \
    .builder \
    .appName("kueski_challenge") \
    .config('spark.hadoop.fs.s3a.endpoint', 'http://'+minio_socket+':9000') \
    .config("spark.hadoop.fs.s3a.access.key", "kueski") \
    .config("spark.hadoop.fs.s3a.secret.key", "kueski123") \
    .config("spark.hadoop.fs.s3a.path.style.access", True) \
    .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .getOrCreate()

21/07/08 02:04:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
custom_schema = StructType(
    [StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', FloatType(), True),
    StructField('timestamp', TimestampType(), True)]
)

In [5]:
df = spark.read.csv(IN_PATH_RAW_DATA, header=True, schema=custom_schema)

21/07/08 02:04:56 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
21/07/08 02:04:58 WARN ApacheUtils: NoSuchMethodException was thrown when disabling normalizeUri. This indicates you are using an old version (< 4.5.8) of Apache http client. It is recommended to use http client version >= 4.5.9 to avoid the breaking change introduced in apache client 4.5.7 and the latency in exception handling. See https://github.com/aws/aws-sdk-java/issues/1919 for more information


In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [7]:
df.show(3)

[Stage 0:>                                                          (0 + 1) / 1]

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
+------+-------+------+-------------------+
only showing top 3 rows



                                                                                

In [8]:
window_def = (Window.partitionBy('userId').orderBy('timestamp'))

df_calc = df.withColumn(
    'nb_previous_ratings', 
    F.row_number().over(window_def)-1
).withColumn(
    'avg_ratings_previous', 
    F.avg(F.lag(df.rating).over(window_def)).over(window_def)
)
df_calc.filter(df_calc.userId==1).show(10)

                                                                                

+------+-------+------+-------------------+-------------------+--------------------+
|userId|movieId|rating|          timestamp|nb_previous_ratings|avg_ratings_previous|
+------+-------+------+-------------------+-------------------+--------------------+
|     1|    924|   3.5|2004-09-10 03:06:38|                  0|                null|
|     1|    919|   3.5|2004-09-10 03:07:01|                  1|                 3.5|
|     1|   2683|   3.5|2004-09-10 03:07:30|                  2|                 3.5|
|     1|   1584|   3.5|2004-09-10 03:07:36|                  3|                 3.5|
|     1|   1079|   4.0|2004-09-10 03:07:45|                  4|                 3.5|
|     1|    653|   3.0|2004-09-10 03:08:11|                  5|                 3.6|
|     1|   2959|   4.0|2004-09-10 03:08:18|                  6|                 3.5|
|     1|    337|   3.5|2004-09-10 03:08:29|                  7|  3.5714285714285716|
|     1|   1304|   3.0|2004-09-10 03:08:40|                  8|  

In [9]:
df_calc.write.mode("overwrite").parquet(OUT_PATH_FEATURES_ALL_ROWS)

                                                                                

In [10]:
# filter value to online store
w = Window.partitionBy('userId')
df_calc = df_calc.select(
    'userId',
    'nb_previous_ratings',
    'avg_ratings_previous'
).withColumn('max_nb_previous_ratings', F.max('nb_previous_ratings').over(w))\
    .where(F.col('nb_previous_ratings') == F.col('max_nb_previous_ratings'))\
    .drop('max_nb_previous_ratings')

In [11]:
df_calc.show(2)



+------+-------------------+--------------------+
|userId|nb_previous_ratings|avg_ratings_previous|
+------+-------------------+--------------------+
|   148|                127|   3.590551181102362|
|   463|                 79|  3.8987341772151898|
+------+-------------------+--------------------+
only showing top 2 rows



                                                                                

In [12]:
df_calc.write.mode("overwrite").parquet(OUT_PATH_FEATURES_ONLINE_STORE)

21/07/08 02:31:57 WARN S3AInstrumentation: Closing output stream statistics while data is still marked as pending upload in OutputStreamStatistics{blocksSubmitted=1, blocksInQueue=1, blocksActive=0, blockUploadsCompleted=0, blockUploadsFailed=0, bytesPendingUpload=0, bytesUploaded=9826, blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, exceptionsInMultipartFinalize=0, transferDuration=0 ms, queueDuration=0 ms, averageQueueTime=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s}
                                                                                