Using Apache Beam, analyzing mobile device usage and user behavior.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install apache-beam -qq

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m51.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m55.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 MB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

In [None]:
mobile_usage = '/content/drive/MyDrive/data/user_behavior_dataset.csv'
# mobile_usage = '/content/user_behavior_dataset.csv'

In [None]:
# Define a function to parse CSV rows
def parse_csv(row):
    import csv
    for record in csv.reader([row]):
        return {
            "UserID": int(record[0]),
            "DeviceModel": record[1],
            "OS": record[2],
            "AppUsageTime": int(record[3]),
            "ScreenOnTime": float(record[4]),
            "BatteryDrain": float(record[5]),
            "NumOfApps": int(record[6]),
            "DataUsage": float(record[7]),
            "Age": int(record[8]),
            "Gender": record[9],
            "UserBehaviorClass": int(record[10])
        }

In [None]:
# Function to find top 5 models
def find_top_models(usage):
  return (usage['DeviceModel'], 1)

In [None]:
# Function to get minimum and maximum age users in the dataset
def get_min_max_age(usage):
  # print("First record:", next(iter(usage), "No records found"))
  ages = []
  for record in usage:
    if isinstance(record, dict):
      ages.append(int(record['Age']))
    else:
      # print("Unexpected record format:", record)
      continue

  if ages:
    return min(ages), max(ages)
  else:
    return None, None


In [None]:
# Function to compute average data usage for each user behavior class
def user_map_class(usage):
  return (usage['UserBehaviorClass'], usage['DataUsage'])

class average_data_usage(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)  # (sum of data usage, count)

    def add_input(self, accumulator, data_usage):
        if isinstance(data_usage, tuple):
            data_usage = data_usage[0]
        return accumulator[0] + data_usage, accumulator[1] + 1

    def merge_accumulators(self, accumulators):
        sum_data_usage = sum(accumulator[0] for accumulator in accumulators)
        count = sum(accumulator[1] for accumulator in accumulators)
        return (sum_data_usage, count)

    def extract_output(self, accumulator):
        (sum_data_usage, count) = accumulator
        return sum_data_usage / count if count > 0 else float('NaN')


In [None]:
# Function to compute average battery drain for each device model
def map_battery_drain(usage):
  return (usage['DeviceModel'], usage['BatteryDrain'])

class average_battery_drain(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)  # (sum of data usage, count)

    def add_input(self, accumulator, battery_drain):
        if isinstance(battery_drain, tuple):
            battery_drain = battery_drain[0]
        return accumulator[0] + battery_drain, accumulator[1] + 1

    def merge_accumulators(self, accumulators):
        sum_battery_drain = sum(accumulator[0] for accumulator in accumulators)
        count = sum(accumulator[1] for accumulator in accumulators)
        return (sum_battery_drain, count)

    def extract_output(self, accumulator):
        (sum_battery_drain, count) = accumulator
        return sum_battery_drain / count if count > 0 else float('NaN')


In [None]:
# Extract age
def get_age(usage):
  return usage['Age']

class AgeRangeCombineFn(beam.CombineFn):
    def create_accumulator(self):
        return float('inf'), float('-inf')  # (youngest age, oldest age)

    def add_input(self, accumulator, age):
        min_age, max_age = accumulator
        return min(min_age, age), max(max_age, age)

    def merge_accumulators(self, accumulators):
        min_age = min(acc[0] for acc in accumulators)
        max_age = max(acc[1] for acc in accumulators)
        return min_age, max_age

    def extract_output(self, accumulator):
        return accumulator

In [None]:
# Format the output as a list of strings for writing to the file
def format_age_range(age_range):
    min_age, max_age = age_range
    return [f"Youngest Age: {min_age}", f"Oldest Age: {max_age}"]

In [None]:
# Function to map gender
def gender_map_class(usage):
  return (usage['Gender'], 1)

In [None]:
# Function to map user_id and app uage time
def app_map_class(usage):
  return (usage['UserID'], usage['AppUsageTime'])

In [None]:
options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
    # Read data from the File
    usage = (
        pipeline
        | "Read CSV" >> beam.io.ReadFromText(mobile_usage, skip_header_lines=1)
        | "Parse CSV" >> beam.Map(parse_csv)
    )

    # # 1. Top 5 models
    top_device_models = (
        usage
        | "Get Device models" >> beam.Map(find_top_models)
        | "Count Device models" >> beam.CombinePerKey(sum)
        | "Top 5 Device models" >> beam.transforms.combiners.Top.Of(5, key=lambda devicemodels: devicemodels[1])
        | "Write Survivors" >> beam.io.WriteToText('top_device_models.txt')
    )

    user_age_range = (
        usage
        | "Get Age" >> beam.Map(get_age)
        | "Calculate Min and Max ages" >> beam.CombineGlobally(AgeRangeCombineFn())
        | "Format Age Range" >> beam.Map(format_age_range)
        | "Write Age Range" >> beam.io.WriteToText('user_age_range.txt')
    )

    # 3. Average Data usage by User behaviour class
    data_usage_by_class = (
        usage
        | "Map to User Behavior Class" >> beam.Map(user_map_class)
        | "Average Data Usage" >> beam.CombinePerKey(average_data_usage())
        | "Write Average Data Usage by Class" >> beam.io.WriteToText('data_usage_by_class.txt')
    )

    # 4 Percentage of male and female users
    gender_distribution = (
        usage
        | "Get Gender" >> beam.Map(gender_map_class)
        | "Count Gender" >> beam.GroupByKey()
        | " xya" >> beam.Map(lambda users: (users[0], sum(users[1])))
        | "Write Gender Distribution" >> beam.io.WriteToText('gender_distribution.txt')
    )

    # 5. Top 10 users with high daily app usage
    power_users = (
        usage
        | "Map UserID and AppUsageTime" >> beam.Map(app_map_class)
        | "Get Top 10 Users" >> beam.combiners.Top.Of(10, key=lambda users: users[1])
        | "Flatten Top 10 List" >> beam.FlatMap(lambda users: users)  # Flatten the list of top users
        | "Print All Users" >> beam.Map(lambda user: print(user) or user)
        | "Extract User IDs Only" >> beam.Map(lambda userid: userid[0])
        | "Write Power Users" >> beam.io.WriteToText('power_users.txt')
    )

    # 6. Average Battery Drain by device model
    # battery_drain_by_model = (
    #     usage
    #     | "Map Device Model and Battery Dran" >> beam.Map(map_battery_drain)
    #     | "Average by Device Model" >> beam.CombinePerKey(average_battery_drain())
    #     | "Write Average Battery Drain" >> beam.io.WriteToText('battery_drain_by_model.txt')
    # )



(368, 598)
(185, 597)
(342, 597)
(167, 595)
(655, 594)
(35, 593)
(538, 593)
(540, 592)
(452, 591)
(531, 589)


In [None]:
data = [(i, v) for i, v in enumerate(range(20))]
data

[(0, 0),
 (1, 1),
 (2, 2),
 (3, 3),
 (4, 4),
 (5, 5),
 (6, 6),
 (7, 7),
 (8, 8),
 (9, 9),
 (10, 10),
 (11, 11),
 (12, 12),
 (13, 13),
 (14, 14),
 (15, 15),
 (16, 16),
 (17, 17),
 (18, 18),
 (19, 19)]

In [None]:
output = data | beam.combiners.Top.Of(10, key=lambda users: users[1])
output[0]



[(19, 19),
 (18, 18),
 (17, 17),
 (16, 16),
 (15, 15),
 (14, 14),
 (13, 13),
 (12, 12),
 (11, 11),
 (10, 10)]

In [None]:
output | beam.FlatMap(lambda users: users)



[(19, 19),
 (18, 18),
 (17, 17),
 (16, 16),
 (15, 15),
 (14, 14),
 (13, 13),
 (12, 12),
 (11, 11),
 (10, 10)]