# Main

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

def convert_time_to_gmt8(input_df):


    # 复制输入 DataFrame，以避免在原始 DataFrame 上进行修改
    result_df = input_df

    # 定义一个函数来将时间增加 8 小时
    def add_eight_hours(time_str, date_str):
        # 将时间和日期字符串组合成完整的日期时间字符串
        datetime_str = F.concat_ws(" ", date_str, time_str)
        
        # 将日期时间字符串解析为 Timestamp 对象
        timestamp = F.unix_timestamp(datetime_str, "yyyy-MM-dd HH:mm:ss").cast("timestamp")
        
        # 增加 8 小时
        gmt8_time = timestamp + F.expr("INTERVAL 8 HOURS")
    
        # 将 Timestamp 对象格式化为 HH:mm:ss 字符串和 YYYY-MM-DD 日期字符串
        formatted_time = F.date_format(gmt8_time, "HH:mm:ss")
        formatted_date = F.date_format(gmt8_time, "yyyy-MM-dd")

        
        return formatted_date, formatted_time, gmt8_time
    


    # 使用 withColumn 转换将 "Date" 列和 "Time" 列的时间值增加 8 小时
    result_df = result_df.withColumn(
        "Date",
        add_eight_hours(col("Time"), col("Date"))[0]
    ).withColumn(
        "Time",
        add_eight_hours(col("Time"), col("Date"))[1]
    ).withColumn(
        "TimeDate",
        F.concat_ws(" ", col("Date"), col("Time"))
    ).withColumn(
        "Basedate",
        F.unix_timestamp(F.lit("1899-12-30 00:00:00"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")
    ).withColumn(    
        "Timestamp",
        ((F.unix_timestamp(col("TimeDate"), "yyyy-MM-dd HH:mm:ss") - F.unix_timestamp(col("Basedate"), "yyyy-MM-dd HH:mm:ss"))/ 86400)
    )

    result_df = result_df.drop("TimeDate")
    q1_df = result_df.drop("Basedate")
    return q1_df



def top_five_users(input_df):
    # Create a new column "DateTimestamp" by combining "Date" and "Time" columns
    data_with_datetime = result_data.withColumn(
        "DateTimestamp",
        F.concat_ws(" ", col("Date"), col("Time"))
    )

    # Convert "DateTimestamp" to timestamp type
    data_with_datetime = data_with_datetime.withColumn(
        "DateTimestamp",
        F.unix_timestamp(col("DateTimestamp"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")
    )

    # Create a window specification for each user and date
    user_window_spec = Window.partitionBy("UserID", F.to_date("DateTimestamp")).orderBy("DateTimestamp")

    # Count the number of data points for each user on each day
    data_with_count = data_with_datetime.withColumn(
        "DataPointsCount",
        F.count("UserID").over(user_window_spec)
    )

    # Filter days with at least five data points
    data_filtered = data_with_count.filter(col("DataPointsCount") >= 5)

    # Count the number of days with at least five data points for each user
    user_days_count = data_filtered.groupBy("UserID").agg(F.countDistinct(F.to_date("DateTimestamp")).alias("DaysCount"))

    # Rank users based on the number of days with at least five data points
    user_ranked = user_days_count.withColumn(
        "Rank",
        F.rank().over(Window.orderBy(col("DaysCount").desc(), col("UserID")))
    )

    # Filter the top 5 users
    q2_df = user_ranked.filter(col("Rank") <= 5).select("UserID", "DaysCount")

    return q2_df


def calculate_weeks_with_data(input_df):
    # Assuming the input DataFrame has the necessary columns and transformations
    # If not, apply the necessary transformations to convert time and timestamp columns
    
    # Create a window specification for each user and week starting from Monday
    user_week_window_spec = Window.partitionBy("UserID", F.year("DateTimestamp"), F.weekofyear("DateTimestamp", startDayOfWeek=2)).orderBy("DateTimestamp")

    # Count the number of data points for each user in each week
    data_with_count = input_df.withColumn(
        "DataPointsCount",
        F.count("UserID").over(user_week_window_spec)
    )

    # Filter weeks with more than 100 data points
    data_filtered = data_with_count.filter(col("DataPointsCount") > 100)

    # Count the number of weeks with more than 100 data points for each user
    q3_df = data_filtered.groupBy("UserID").agg(F.countDistinct(F.concat(F.year("DateTimestamp"), F.weekofyear("DateTimestamp", startDayOfWeek=2))).alias("WeeksCount"))

    # Show the result
    return q3_df



# 创建 Spark session
spark = SparkSession.builder.appName("TimeConversion").getOrCreate()

# 请替换 "your_data.csv" 为实际数据文件路径
file_path = "/content/drive/MyDrive/Colab Notebooks/dataset.txt"
raw_data = spark.read.option("header", "true").csv(file_path)

# 使用函数进行转换
q1 = convert_time_to_gmt8(raw_data)
q1.show()

q2 = top_five_users(q1)
q2.show()

q3 = calculate_weeks_with_data(q1)
q3.show()

# Q1_F

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

def convert_time_to_gmt8(input_df):
    # 创建 Spark session
    spark = SparkSession.builder.appName("TimeConversion").getOrCreate()

    # 复制输入 DataFrame，以避免在原始 DataFrame 上进行修改
    result_df = input_df

    # 定义一个函数来将时间增加 8 小时
    def add_eight_hours(time_str, date_str):
        # 将时间和日期字符串组合成完整的日期时间字符串
        datetime_str = F.concat_ws(" ", date_str, time_str)
        
        # 将日期时间字符串解析为 Timestamp 对象
        timestamp = F.unix_timestamp(datetime_str, "yyyy-MM-dd HH:mm:ss").cast("timestamp")
        
        # 增加 8 小时
        gmt8_time = timestamp + F.expr("INTERVAL 8 HOURS")

        # 转换为从 1899 年 12 月 30 日的天数
        # days_since_1899 = (F.datediff(gmt8_time, F.lit("1899-12-30")))
    
        # 将 Timestamp 对象格式化为 HH:mm:ss 字符串和 YYYY-MM-DD 日期字符串
        formatted_time = F.date_format(gmt8_time, "HH:mm:ss")
        formatted_date = F.date_format(gmt8_time, "yyyy-MM-dd")

        # convert"1899-12-30 00:00:00" to timestamp
        # base_time = F.unix_timestamp(F.lit("1899-12-30 00:00:00"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")
        
        return formatted_date, formatted_time, gmt8_time
    


    # 使用 withColumn 转换将 "Date" 列和 "Time" 列的时间值增加 8 小时
    result_df = result_df.withColumn(
        "Date",
        add_eight_hours(col("Time"), col("Date"))[0]
    ).withColumn(
        "Time",
        add_eight_hours(col("Time"), col("Date"))[1]
    ).withColumn(
        "TimeDate",
        F.concat_ws(" ", col("Date"), col("Time"))
    ).withColumn(
        "Basedate",
        F.unix_timestamp(F.lit("1899-12-30 00:00:00"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")
    ).withColumn(    
        "Timestamp",
        ((F.unix_timestamp(col("TimeDate"), "yyyy-MM-dd HH:mm:ss") - F.unix_timestamp(col("Basedate"), "yyyy-MM-dd HH:mm:ss"))/ 86400)
    )

    result_df = result_df.drop("TimeDate")
    result_df = result_df.drop("Basedate")
    return result_df


# 示例用法
# 请替换 "your_data.csv" 为实际数据文件路径
file_path = "/content/drive/MyDrive/Colab Notebooks/dataset.txt"
raw_data = spark.read.option("header", "true").csv(file_path)

# 使用函数进行转换
result_data = convert_time_to_gmt8(raw_data)

# 显示结果
result_data.show()


# Q2

In [None]:
def top_five_users(input_df):
    # Create a new column "DateTimestamp" by combining "Date" and "Time" columns
    data_with_datetime = result_data.withColumn(
        "DateTimestamp",
        F.concat_ws(" ", col("Date"), col("Time"))
    )

    # Convert "DateTimestamp" to timestamp type
    data_with_datetime = data_with_datetime.withColumn(
        "DateTimestamp",
        F.unix_timestamp(col("DateTimestamp"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")
    )

    # Create a window specification for each user and date
    user_window_spec = Window.partitionBy("UserID", F.to_date("DateTimestamp")).orderBy("DateTimestamp")

    # Count the number of data points for each user on each day
    data_with_count = data_with_datetime.withColumn(
        "DataPointsCount",
        F.count("UserID").over(user_window_spec)
    )

    # Filter days with at least five data points
    data_filtered = data_with_count.filter(col("DataPointsCount") >= 5)

    # Count the number of days with at least five data points for each user
    user_days_count = data_filtered.groupBy("UserID").agg(F.countDistinct(F.to_date("DateTimestamp")).alias("DaysCount"))

    # Rank users based on the number of days with at least five data points
    user_ranked = user_days_count.withColumn(
        "Rank",
        F.rank().over(Window.orderBy(col("DaysCount").desc(), col("UserID")))
    )

    # Filter the top 5 users
    top_users = user_ranked.filter(col("Rank") <= 5).select("UserID", "DaysCount")

    return top_users


# Q3

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def calculate_weeks_with_data(input_df):
    # Assuming the input DataFrame has the necessary columns and transformations
    # If not, apply the necessary transformations to convert time and timestamp columns
    
    # Create a window specification for each user and week
    user_week_window_spec = Window.partitionBy("UserID", F.year("DateTimestamp"), F.weekofyear("DateTimestamp")).orderBy("DateTimestamp")

    # Count the number of data points for each user in each week
    data_with_count = input_df.withColumn(
        "DataPointsCount",
        F.count("UserID").over(user_week_window_spec)
    )

    # Filter weeks with more than 100 data points
    data_filtered = data_with_count.filter(col("DataPointsCount") > 100)

    # Count the number of weeks with more than 100 data points for each user
    user_weeks_count = data_filtered.groupBy("UserID").agg(F.countDistinct(F.concat(F.year("DateTimestamp"), F.weekofyear("DateTimestamp"))).alias("WeeksCount"))

    # Show the result
    user_weeks_count.show()

# Create a Spark session
spark = SparkSession.builder.appName("UserDataAnalysis").getOrCreate()

# Read the dataset
file_path = "your_data.csv"  # Replace with the actual path to your data file
raw_data = spark.read.option("header", "true").csv(file_path)

# Assuming the dataset has been processed using the previous conversion function
# If not, apply the necessary transformations to convert time and timestamp columns

# Create a new column "DateTimestamp" by combining "Date" and "Time" columns
data_with_datetime = raw_data.withColumn(
    "DateTimestamp",
    F.concat_ws(" ", col("Date"), col("Time"))
)

# Convert "DateTimestamp" to timestamp type
data_with_datetime = data_with_datetime.withColumn(
    "DateTimestamp",
    F.unix_timestamp(col("DateTimestamp"), "yyyy-MM-dd HH:mm:ss").cast("timestamp")
)

# Call the function to calculate weeks with more than 100 data points
calculate_weeks_with_data(data_with_datetime)
