In [1]:
# Installing required packages  
!pip install pyspark  findspark wget

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import findspark

findspark.init()

In [8]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, sum, date_format, to_date


In [9]:
# Creating a SparkContext object
sc = SparkContext.getOrCreate()

# Creating a SparkSession
spark = SparkSession \
    .builder \
    .appName("test_spark") \
    .getOrCreate()

In [72]:
student_schema = StructType([
    StructField("code", IntegerType(), True),
    StructField("fullname", StringType(), True)
])

activity_schema = StructType([
    StructField("student_code", IntegerType(), True),
    StructField("activity", StringType(), True),
    StructField("numberOfFile", StringType(), True),
    StructField("timestamp", StringType(), True)
])

In [33]:
csv_file_path = 'danh_sach_sv_de.csv'
parquet_file_path = 'log_action.csv'

In [73]:
student_df = spark.read.csv(csv_file_path, header=False, schema=student_schema)

print("CSV preview:")
student_df.printSchema()
student_df.limit(5).show()

CSV preview:
root
 |-- code: integer (nullable = true)
 |-- fullname: string (nullable = true)

+----+-----------------+
|code|         fullname|
+----+-----------------+
|   1|       Mai Đức An|
|   2|   Nguyễn Mai Anh|
|   3|Ngô Ngọc Tuấn Anh|
|   4|   Trần Trung Anh|
|   5|    Trần Ngọc Bảo|
+----+-----------------+



In [23]:
activity_df = spark.read.csv(parquet_file_path, header=False, schema=activity_schema)

print("Parquet Schema:")
activity_df.printSchema()
activity_df.limit(5).show()

Parquet Schema:
root
 |-- student_code: integer (nullable = true)
 |-- activity: string (nullable = true)
 |-- numberOfFile: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------------+--------+------------+---------+
|student_code|activity|numberOfFile|timestamp|
+------------+--------+------------+---------+
|           4|   write|           7|6/10/2024|
|          33|    read|           5|6/12/2024|
|          33| execute|           1|6/13/2024|
|           6|   write|           6|6/15/2024|
|          24| execute|           8|6/12/2024|
+------------+--------+------------+---------+



In [74]:
raw_df = activity_df.join(student_df, activity_df['student_code'] == student_df['code'], 'outer')

raw_df.show()
raw_df.count()

+------------+--------+------------+---------+----+-----------------+
|student_code|activity|numberOfFile|timestamp|code|         fullname|
+------------+--------+------------+---------+----+-----------------+
|           1|    read|           5|6/13/2024|   1|       Mai Đức An|
|           1|    read|           8|6/10/2024|   1|       Mai Đức An|
|           1|    read|           9|6/13/2024|   1|       Mai Đức An|
|           1|    read|           5|6/11/2024|   1|       Mai Đức An|
|           1|    read|           6|6/11/2024|   1|       Mai Đức An|
|           1|    read|           9|6/10/2024|   1|       Mai Đức An|
|           1|   write|           4|6/14/2024|   1|       Mai Đức An|
|           1| execute|           3|6/11/2024|   1|       Mai Đức An|
|           1| execute|          10|6/12/2024|   1|       Mai Đức An|
|           1|    read|           8|6/10/2024|   1|       Mai Đức An|
|           1|   write|           6|6/10/2024|   1|       Mai Đức An|
|           1|   wri

400

In [80]:
transformed_df = raw_df.withColumn('date', date_format(to_date(col('timestamp'), 'M/dd/yyyy'), 'yyyyMMdd'))
transformed_df = transformed_df.withColumnRenamed("fullname", "student_name")
print("Number of records: ", transformed_df.count())

Number of records:  400


In [87]:
cols = [col('date'), col('student_code'), col('student_name'), col('activity')]
condition = (col("activity").isin(["Football", "Basketball", "Tennis"]))


transformed_df.createOrReplaceTempView("temp_view")
query = """
    SELECT date, student_code, student_name, activity, int(sum(numberOfFile)) as totalFile
    FROM temp_view
    GROUP BY date, student_code, student_name, activity
"""
# query = """
#     select date, student_code
#     from temp_view
# """

# # Execute the SQL query
result_df = spark.sql(query)
result_df.show()




# output_df.show()

+--------+------------+--------------------+--------+---------+
|    date|student_code|        student_name|activity|totalFile|
+--------+------------+--------------------+--------+---------+
|20240612|          10|    Nguyễn Quốc Dũng|    read|        8|
|20240614|          16|        Đỗ Doãn Khắc| execute|        9|
|20240612|          17|      Châu Minh Khải|   write|        3|
|20240615|           3|   Ngô Ngọc Tuấn Anh|    read|       24|
|20240612|           8|        Đỗ Thành Đạt|    read|        3|
|20240614|          19|        Lê Bảo Khánh|    read|        1|
|20240611|          20|        Lê Minh Phúc|    read|        6|
|20240613|          37|          Đào Anh Vũ| execute|        1|
|20240611|          14|        Ngô Phi Hùng| execute|       12|
|20240611|          21| Trần Phúc Mạnh Linh| execute|       10|
|20240614|          36|        Vũ Khắc Long|    read|        5|
|20240611|          36|        Vũ Khắc Long| execute|        3|
|20240614|           5|       Trần Ngọc 