# Data processcing step

### 1. Initialize spark session

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data processcing DE") \
    .getOrCreate()

print("Successful")

Successful


### 2. Read the student list file.

In [9]:
hdfs_path_student_list = "hdfs://namenode:9000/raw_zone/fact/student_list"

# Read Parquet files from HDFS path into a DataFrame
student_df = spark.read.csv(hdfs_path_student_list)

# Show the schema of the DataFrame
student_df.printSchema()

# Show the first few rows of the DataFrame
student_df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)

+---+--------------------+
|_c0|                 _c1|
+---+--------------------+
|  1|          Mai Đức An|
|  2|      Nguyễn Mai Anh|
|  3|   Ngô Ngọc Tuấn Anh|
|  4|      Trần Trung Anh|
|  5|       Trần Ngọc Bảo|
|  6|  Nguyễn Vũ Hòa Bình|
|  7|    Nguyễn Thành Đạt|
|  8|        Đỗ Thành Đạt|
|  9|    Nguyễn Khoa Đoàn|
| 10|    Nguyễn Quốc Dũng|
| 11|     Đường Minh Quân|
| 12|   Dương Quang Giang|
| 13|    Nguyễn Minh Hiếu|
| 14|        Ngô Phi Hùng|
| 15|Nguyễn Đình Thiên...|
| 16|        Đỗ Doãn Khắc|
| 17|      Châu Minh Khải|
| 18|      Phạm Đình Khôi|
| 19|        Lê Bảo Khánh|
| 20|        Lê Minh Phúc|
+---+--------------------+
only showing top 20 rows



### 3. Read the student activity file.

In [10]:
hdfs_path_activity = "hdfs://namenode:9000/raw_zone/fact/activity"

# Read Parquet files from HDFS path into a DataFrame
activity_df = spark.read.parquet(hdfs_path_activity)

# Show the schema of the DataFrame
activity_df.printSchema()

# Show the first few rows of the DataFrame
activity_df.show()

root
 |-- studentCode: integer (nullable = true)
 |-- activity: string (nullable = true)
 |-- numberOfFiles: integer (nullable = true)
 |-- timestamp: string (nullable = true)

+-----------+--------+-------------+---------+
|studentCode|activity|numberOfFiles|timestamp|
+-----------+--------+-------------+---------+
|          4|   write|            7|6/10/2024|
|         33|    read|            5|6/12/2024|
|         33| execute|            1|6/13/2024|
|          6|   write|            6|6/15/2024|
|         24| execute|            8|6/12/2024|
|         22|   write|            2|6/12/2024|
|         31|   write|            9|6/13/2024|
|          8|   write|            4|6/13/2024|
|         21|    read|            5|6/12/2024|
|         26| execute|            2|6/10/2024|
|         24|    read|           10|6/12/2024|
|         10|    read|            6|6/15/2024|
|         20|   write|            7|6/14/2024|
|         14| execute|            7|6/11/2024|
|          5| execute|  

### Convert date datatype from string to date, sort by date

In [20]:
from pyspark.sql.functions import to_date

activity_df = activity_df.withColumn("timestamp", to_date(activity_df.timestamp, "M/d/yyyy"))

activity_df = activity_df.sort("timestamp")
activity_df.show()

+-----------+--------+-------------+----------+
|studentCode|activity|numberOfFiles| timestamp|
+-----------+--------+-------------+----------+
|          5| execute|            1|2024-06-10|
|         35| execute|            4|2024-06-10|
|         38| execute|            6|2024-06-10|
|         22| execute|            6|2024-06-10|
|          8|   write|            7|2024-06-10|
|         33|   write|            3|2024-06-10|
|          4|    read|            4|2024-06-10|
|         24|   write|           10|2024-06-10|
|          1|    read|            9|2024-06-10|
|         31|    read|            1|2024-06-10|
|          3| execute|            4|2024-06-10|
|         26| execute|            2|2024-06-10|
|         10|   write|            5|2024-06-10|
|         36|    read|            6|2024-06-10|
|         17| execute|            5|2024-06-10|
|          5| execute|            1|2024-06-10|
|         16|    read|            6|2024-06-10|
|         36|    read|            5|2024

In [31]:
joined_df = activity_df.join(student_df, activity_df.studentCode == student_df._c0).filter(activity_df.studentCode == 37)
joined_df.show()

+-----------+--------+-------------+----------+---+----------+
|studentCode|activity|numberOfFiles| timestamp|_c0|       _c1|
+-----------+--------+-------------+----------+---+----------+
|         37|    read|            8|2024-06-10| 37|Đào Anh Vũ|
|         37| execute|            6|2024-06-15| 37|Đào Anh Vũ|
|         37|    read|            5|2024-06-15| 37|Đào Anh Vũ|
|         37| execute|            9|2024-06-12| 37|Đào Anh Vũ|
|         37|    read|           10|2024-06-11| 37|Đào Anh Vũ|
|         37| execute|            9|2024-06-10| 37|Đào Anh Vũ|
|         37|   write|            3|2024-06-11| 37|Đào Anh Vũ|
|         37|   write|            4|2024-06-11| 37|Đào Anh Vũ|
|         37|   write|            5|2024-06-10| 37|Đào Anh Vũ|
|         37|    read|            5|2024-06-15| 37|Đào Anh Vũ|
|         37| execute|            1|2024-06-13| 37|Đào Anh Vũ|
+-----------+--------+-------------+----------+---+----------+



In [37]:
from pyspark.sql.functions import first, sum

grouped_df = joined_df.groupBy("activity", "timestamp").agg(first("_c1").alias("student_name"), 
    first("studentCode").alias("studentCode"), sum("numberOfFiles").alias("totalFiles"))

grouped_df.show()

+--------+----------+------------+-----------+----------+
|activity| timestamp|student_name|studentCode|totalFiles|
+--------+----------+------------+-----------+----------+
| execute|2024-06-10|  Đào Anh Vũ|         37|         9|
| execute|2024-06-12|  Đào Anh Vũ|         37|         9|
| execute|2024-06-13|  Đào Anh Vũ|         37|         1|
| execute|2024-06-15|  Đào Anh Vũ|         37|         6|
|    read|2024-06-10|  Đào Anh Vũ|         37|         8|
|    read|2024-06-11|  Đào Anh Vũ|         37|        10|
|    read|2024-06-15|  Đào Anh Vũ|         37|        10|
|   write|2024-06-10|  Đào Anh Vũ|         37|         5|
|   write|2024-06-11|  Đào Anh Vũ|         37|         7|
+--------+----------+------------+-----------+----------+



In [43]:
from pyspark.sql.functions import date_format
grouped_df = grouped_df.select("timestamp", "studentCode", "student_name", "activity", "totalFiles").sort("timestamp")
grouped_df = grouped_df.withColumn("timestamp", date_format("timestamp", "yyyyMMdd"))
grouped_df.show()

+---------+-----------+------------+--------+----------+
|timestamp|studentCode|student_name|activity|totalFiles|
+---------+-----------+------------+--------+----------+
| 20240610|         37|  Đào Anh Vũ| execute|         9|
| 20240610|         37|  Đào Anh Vũ|    read|         8|
| 20240610|         37|  Đào Anh Vũ|   write|         5|
| 20240611|         37|  Đào Anh Vũ|    read|        10|
| 20240611|         37|  Đào Anh Vũ|   write|         7|
| 20240612|         37|  Đào Anh Vũ| execute|         9|
| 20240613|         37|  Đào Anh Vũ| execute|         1|
| 20240615|         37|  Đào Anh Vũ| execute|         6|
| 20240615|         37|  Đào Anh Vũ|    read|        10|
+---------+-----------+------------+--------+----------+



In [46]:
hdfs_save_path = "hdfs://namenode:9000/raw_zone/fact/output"

grouped_df.write.csv(hdfs_save_path, header=True)

print("Successfully save data to HDFS:", hdfs_save_path)

Successfully save data to HDFS: hdfs://namenode:9000/raw_zone/fact/output


In [68]:
import os

grouped_df.write.csv("./output", mode="overwrite", header=True)

# Print confirmation message
print("Successfully saved data to Docker volume")

for filename in os.listdir("./output"):
    file_path = os.path.join("./output", filename)
    if file_path.endswith(".csv"):
        print(file_path)
        split_array = file_path.split("/")
        split_array[-1] = "output.csv"
        new_file_path = "/".join(split_array)
        print(new_file_path)
        os.rename(file_path, new_file_path)
    else:
        os.remove(file_path)

Successfully saved data to Docker volume
./output/part-00000-ef176a2e-2f7e-4de0-ab0d-ea36af3dbfd7-c000.csv
./output/output.csv
