In [30]:
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import RuntimeExecutionMode
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream.functions import ReduceFunction
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import RuntimeContext
from datetime import datetime
import json

class JsonObjectMapFunction(MapFunction):
    def map(self, value):
        return json.loads(value)

def read_json_as_datastream(file_path: str, env: StreamExecutionEnvironment):
    source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), file_path).build()
    data_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "txt_source")    
    parsed_stream = data_stream.map(JsonObjectMapFunction())
    return parsed_stream

In [2]:
import os

In [3]:
os.getcwd()

'/Users/chuqiwang/Documents/UCI/CS224P/assignments/HW6'

In [72]:
# Path of the data files, change it accordingly
records_path = "zot-music-dataset-hw6/Records.jsonl"
review_likes_path = "zot-music-dataset-hw6/ReviewLikes.jsonl"
reviews_path = "zot-music-dataset-hw6/Reviews.jsonl"
sessions_path = "zot-music-dataset-hw6/Sessions.jsonl"
users_path = "zot-music-dataset-hw6/Users.jsonl"

In [5]:
# Q1.a
# Your Answer: 
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# Your code here
user_stream = read_json_as_datastream(users_path, env)
class PrintTypeMapFunction(MapFunction):
    def map(self, value):
        print(f"Data: {value}, Type: {type(value)}")
        return value
user_stream = user_stream.map(PrintTypeMapFunction())
env.execute("Q1.a")

Data: {'user_id': 'user_vjWSqut3SK-2', 'email': 'anna49@college.edu', 'joined_date': '2015-01-26', 'nickname': 'anna49', 'address': {'street': '247 Acevedo Overpass Suite 623', 'state': 'Michigan', 'zip': '31431'}, 'genres': ['Jazz', 'Disco', 'Hip-Hop', 'Folk'], 'is_listener': True, 'is_artist': False, 'subscription': 'monthly', 'real_name': {'first_name': 'Michael', 'last_name': 'Ramirez'}}, Type: <class 'dict'>
Data: {'user_id': 'user_ECQlpCSMQfKm', 'email': 'zwarren@foxmail.com', 'joined_date': '2015-12-26', 'nickname': 'zwarren', 'address': {'street': '649 Michael Fork Apt. 573', 'city': 'Ashleyshire', 'zip': '62250'}, 'genres': ['Classical', 'Funk', 'Country', 'Disco'], 'is_listener': True, 'is_artist': True, 'subscription': 'monthly', 'real_name': {'first_name': 'Thomas', 'last_name': 'Holmes'}, 'stage_name': 'panderson', 'bio': 'Quality morning fire rule open. Keep continue attack want school certainly. Special check many listen travel.\nSort continue thank cover window turn maj

<pyflink.common.job_execution_result.JobExecutionResult at 0x1375b1ed0>

In [75]:
# Q1.a
# From the outout of Q1.a, the type is Python dictionary.

In [7]:
# Q1.b
# Your Answer:

# The function `read_json_as_datastream` reads a file containing JSON objects (one per line) 
# into a PyFlink data stream using the provided file path and execution environment.
# 
# The class `JsonObjectMapFunction` is a custom implementation of the PyFlink `MapFunction` 
# interface. It processes each line of the file (received as a string) and converts it 
# into a Python dictionary using the `json.loads()` method. 
# This conversion is necessary because PyFlink reads data as plain text by default, 
# and the JSON strings need to be parsed into structured Python objects for downstream processing.
# 
# Example Workflow:
# 1. `env.read_text_file(file_path)` reads the file line-by-line as strings.
# 2. `JsonObjectMapFunction.map()` is applied to each line to parse it into a dictionary.
# 3. The resulting data stream consists of Python dictionaries, ready for further processing.

In [8]:
# Q1.c
# Your Answer: 

# The annotation (e.g., 3 > some data here) in the output indicates the parallel task ID (3) followed by 
# the processed data. The number identifies which parallel instance of the operator produced the output, 
# useful for debugging and understanding data distribution in Flink's distributed processing.

In [9]:
# Q2.a DataStream API
env = StreamExecutionEnvironment.get_execution_environment()
# Your code here
source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), records_path).build()
data_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "records_source")

filtered_stream = data_stream \
    .map(lambda value: json.loads(value)) \
    .filter(lambda record: record["genre"] == "R&B" and record["is_album"] and len(record["songs"]) >= 12) \
    .map(lambda record: {
        "record_id": record["record_id"],
        "title": record["title"],
        "genre": record["genre"],
        "number_of_songs": len(record["songs"])
    })

filtered_stream.print()
env.execute("Q2.a DataStream API")


3> {'record_id': 'record_KhNPrSUbSj--', 'title': 'Collection city service', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_Moz2GxQ1SvSk', 'title': 'Plant son lead', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_OR2mRtPRS-6B', 'title': 'Next', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_rjHeikQCTyqg', 'title': 'Office issue', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_Ic34zxUJS0ic', 'title': 'Education eye say', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_NaXmzSELRnii', 'title': 'Director way north', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_CaaF5m1MQbiF', 'title': 'Always home', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_ZHbWPiQWQTaB', 'title': 'Western city painting adult', 'genre': 'R&B', 'number_of_songs': 12}
3> {'record_id': 'record_T8luoEYGSgaI', 'title': 'Attention miss', 'genre': 'R&B', 'number_of_songs': 12}


<pyflink.common.job_execution_result.JobExecutionResult at 0x105d65780>

In [10]:
# Q2.a Table API

# Your code here
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

t_env.execute_sql(f"""
    CREATE TABLE Records (
        record_id STRING,
        title STRING,
        genre STRING,
        is_album BOOLEAN,
        songs ARRAY<MAP<STRING, STRING>>
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{records_path}',
        'format' = 'json'
    )
""")

result_table = t_env.sql_query("""
    SELECT
        record_id,
        title,
        genre,
        CARDINALITY(songs) AS number_of_songs
    FROM Records
    WHERE genre = 'R&B'
      AND is_album
      AND CARDINALITY(songs) >= 12
""")

result_table.execute().print()


+----+--------------------------------+--------------------------------+--------------------------------+-----------------+
| op |                      record_id |                          title |                          genre | number_of_songs |
+----+--------------------------------+--------------------------------+--------------------------------+-----------------+
| +I |            record_KhNPrSUbSj-- |        Collection city service |                            R&B |              12 |
| +I |            record_Moz2GxQ1SvSk |                 Plant son lead |                            R&B |              12 |
| +I |            record_OR2mRtPRS-6B |                           Next |                            R&B |              12 |
| +I |            record_rjHeikQCTyqg |                   Office issue |                            R&B |              12 |
| +I |            record_Ic34zxUJS0ic |              Education eye say |                            R&B |              12 |
| +I |  

In [11]:
# Q2.b DataStream API
env = StreamExecutionEnvironment.get_execution_environment()

# Your code here
filtered_stream = (
    env.from_source(
        FileSource.for_record_stream_format(StreamFormat.text_line_format(), review_likes_path).build(),
        WatermarkStrategy.no_watermarks(),
        "file_source"
    )
    .map(lambda line: json.loads(line))
    .filter(lambda x: x['user_id'] == "user_G91ZrXr4QOuT")
)

final_count = sum(1 for _ in filtered_stream.execute_and_collect())

print(f"Final like count for user 'user_G91ZrXr4QOuT': {final_count}")


Final like count for user 'user_G91ZrXr4QOuT': 536


In [12]:
# Q2.b Table API

# Your code here
env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = TableEnvironment.create(env_settings)


table_env.execute_sql(f"""
    CREATE TABLE ReviewLikes (
        user_id STRING,
        review_id STRING
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{review_likes_path}',
        'format' = 'json'
    )
""")

result_table = table_env.sql_query("""
    SELECT COUNT(*) AS like_count
    FROM ReviewLikes
    WHERE user_id = 'user_G91ZrXr4QOuT'
""")

result_table.execute().print()


+----------------------+
|           like_count |
+----------------------+
|                  536 |
+----------------------+
1 row in set


In [18]:
# Q2.c DataStream API
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.set_parallelism(1)

# Your code here
def calculate_session_duration(record):
    initiate_time = datetime.strptime(record["session_duration"]["initiate_at"], "%Y-%m-%d %H:%M:%S")
    leave_time = datetime.strptime(record["session_duration"]["leave_at"], "%Y-%m-%d %H:%M:%S")
    return {
        "user_id": record["user_id"],
        "session_duration_minutes": (leave_time - initiate_time).total_seconds() / 60
    }

class AggregateSessionDuration(ReduceFunction):
    def reduce(self, record1, record2):
        record1["session_duration_minutes"] += record2["session_duration_minutes"]
        return record1

result_stream = (
    env.read_text_file(sessions_path)
    .map(lambda line: calculate_session_duration(json.loads(line)))
    .filter(lambda record: record["user_id"] == "user_wQD7tpOzS564")
    .key_by(lambda record: record["user_id"])
    .reduce(AggregateSessionDuration())
)

result_stream.print()
env.execute("Q2.c DataStream API")

{'user_id': 'user_wQD7tpOzS564', 'session_duration_minutes': 1099.7499999999998}


<pyflink.common.job_execution_result.JobExecutionResult at 0x10149a080>

In [16]:
# Q2.c Table API

# Your code here
env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
t_env = TableEnvironment.create(env_settings)

t_env.execute_sql(f"""
    CREATE TABLE sessions (
        user_id STRING,
        session_duration ROW<initiate_at STRING, leave_at STRING>
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{sessions_path}',
        'format' = 'json'
    )
""")

result_table = t_env.sql_query("""
    SELECT 
        user_id, 
        SUM(
            UNIX_TIMESTAMP(session_duration.leave_at) - UNIX_TIMESTAMP(session_duration.initiate_at)
        ) / 60 AS total_session_duration_minutes
    FROM sessions
    WHERE user_id = 'user_wQD7tpOzS564'
    GROUP BY user_id
""")

result_table.execute().print()


+--------------------------------+--------------------------------+
|                        user_id | total_session_duration_minutes |
+--------------------------------+--------------------------------+
|              user_wQD7tpOzS564 |                           1099 |
+--------------------------------+--------------------------------+
1 row in set


In [31]:
# Q2.d
env = StreamExecutionEnvironment.get_execution_environment()

# Your code here
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)

def load_stream(file_path):
    return env.from_source(
        FileSource.for_record_stream_format(StreamFormat.text_line_format(), file_path).build(),
        WatermarkStrategy.no_watermarks(),
        "file_source"
    )

def parse_json(record):
    try:
        parsed_record = json.loads(record)
        if "song" in parsed_record and "record_id" in parsed_record["song"]:
            return {
                "record_id": parsed_record["song"]["record_id"],
                "replay_count": parsed_record.get("replay_count", 0),
            }
    except json.JSONDecodeError:
        pass
    return None

class ReplayAccumulator(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.total_replays = runtime_context.get_state(ValueStateDescriptor("total_replays", Types.LONG()))
        self.emitted = runtime_context.get_state(ValueStateDescriptor("emitted_flag", Types.BOOLEAN()))

    def process_element(self, value, ctx):
        current_total = (self.total_replays.value() or 0) + value["replay_count"]
        if not (self.emitted.value() or False) and current_total > 1000:
            self.emitted.update(True)
            yield value["record_id"]
        self.total_replays.update(current_total)

raw_stream = load_stream(sessions_path)
results_stream = (
    raw_stream
    .map(parse_json)
    .filter(lambda x: x is not None)
    .key_by(lambda record: record["record_id"])
    .process(ReplayAccumulator())
)

for record_id in results_stream.execute_and_collect():
    print(record_id)


record_eynz2XeqRvCC
record_qIENcovfSFWX


In [74]:
# Q2.e
# Your code here
from pyflink.common.time import Time
from pyflink.datastream.functions import KeySelector, ProcessWindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows


class SessionParser:
    @staticmethod
    def parse(json_line):
        record = json.loads(json_line)
        initiated_at = record.get('session_duration', {}).get('initiate_at', "1970-01-01 00:00:00")
        record['event_time'] = int(datetime.strptime(initiated_at, "%Y-%m-%d %H:%M:%S").timestamp())
        return record

class DateExtractor(KeySelector):
    def get_key(self, record):
        return datetime.utcfromtimestamp(record['event_time']).strftime('%Y-%m-%d')

class DailySessionCounter(ProcessWindowFunction):
    def process(self, key, context, elements):
        session_count = len(elements)
        if session_count >= 70:
            yield {"date": key, "session_count": session_count}

class TimestampAssigner:
    def extract_timestamp(self, record, record_timestamp):
        return record['event_time'] * 1000

def setup_environment():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    return env

def build_dataflow():
    environment = setup_environment()
    raw_data = environment.read_text_file(sessions_path)
    parsed_data = raw_data.map(SessionParser.parse)
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
        TimestampAssigner()
    )
    timestamped_data = parsed_data.assign_timestamps_and_watermarks(watermark_strategy)

    results = timestamped_data \
        .key_by(DateExtractor()) \
        .window(TumblingEventTimeWindows.of(Time.days(1))) \
        .process(DailySessionCounter())

    return results

def main():
    result_stream = build_dataflow()
    collected_results = list(result_stream.execute_and_collect())
    print("Session Count Analysis Results:")
    for result in collected_results:
        print(f"Date: {result['date']}, Session Count: {result['session_count']}")

if __name__ == "__main__":
    main()


Session Count Analysis Results:
Date: 2023-03-03, Session Count: 74
Date: 2023-06-12, Session Count: 71
Date: 2023-11-11, Session Count: 78
Date: 2023-11-13, Session Count: 72
Date: 2024-03-15, Session Count: 73
Date: 2024-06-28, Session Count: 72
Date: 2024-08-12, Session Count: 71
Date: 2024-10-08, Session Count: 70
